程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Implementation of async syntax coroutine in Python

編輯:Python

Catalog

1. Conventional Sync Syntax request example

2. Asynchronous requests

3. The generator based process

3.1 generator

3.2 Using generator to realize collaborative process

stay io In more scenes , Async Programs written in syntax take less time , Less resources to accomplish the same task , This article introduces Python Of Async How to realize the synergy of grammar .

1. Conventional Sync Syntax request example

Still the same , In understanding Async Before the implementation of Syntax , Start with one Sync Start with a grammar example , Now suppose there's one HTTP request , This program will get the corresponding response content through this request , And print it out , The code is as follows :

import socketdef request(host: str) -> None: """ Simulate the request and print the response body """ url: str = f"http://{host}" sock: socket.SocketType = socket.socket() sock.connect((host, 80)) sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii")) response_bytes: bytes = b"" chunk: bytes = sock.recv(4096) while chunk: response_bytes += chunk chunk = sock.recv(4096) print("\n".join([i for i in response_bytes.decode().split("\r\n")]))if __name__ == "__main__": request("so1n.me")

Run the program , The program can output normally , The corresponding... Is printed in the upper part HTTP Respond to Header, The next part is printed HTTP Response body , , You can see that the server asks us to https Re request in the form of , The output is as follows :

HTTP/1.1 301 Moved PermanentlyServer: GitHub.comContent-Type: text/htmlLocation: https://so1n.me/X-GitHub-Request-Id: A744:3871:4136AF:48BD9F:6188DB50Content-Length: 162Accept-Ranges: bytesDate: Mon, 08 Nov 2021 08:11:37 GMTVia: 1.1 varnishAge: 104Connection: closeX-Served-By: cache-qpg1272-QPGX-Cache: HITX-Cache-Hits: 2X-Timer: S1636359097.026094,VS0,VE0Vary: Accept-EncodingX-Fastly-Request-ID: 22fa337f777553d33503cee5282598c6a293fb5e<html><head><title>301 Moved Permanently</title></head><body><center><h1>301 Moved Permanently</h1></center><hr><center>nginx</center></body></html>

But this is not to say HTTP How is the request implemented , I don't know the details , In this code , socket The default call to is blocked , When a thread calls connect perhaps recv when (send There is no need to wait , But you need to wait first when you send high and low drain After that send, Small demo There is no need to use drain Method ), The program will pause until the operation is completed . When downloading many web pages at once , This will be the same as the above article , Most of the waiting time is spent on io above , cpu But always free , Although using thread pool can solve this problem , But the cost is huge , At the same time, the operating system often limits a process , The number of threads that a user or machine can use , But the collaborative process does not have these restrictions , Take up less resources , There are no system bottlenecks .

2. Asynchronous requests

Asynchrony allows a single thread to handle concurrent operations , But as I said above , socket Is blocked by default , So we need to socket Set to non blocking , socket Provides setblocking This method allows developers to choose whether to block , After non blocking is set , connect and recv The method also needs to be changed .

Because there is no blockage , The program is calling connect I'll be right back , It's just Python The bottom is C, This code is in C Middle note calling non blocking socket.connect An exception will be thrown after , We need to capture it , Just like this. :

import socketsock: socket.SocketType = socket.socket()sock.setblocking(Flase)try: sock.connect(("so1n.me", 80))except BlockingIOError: pass

After a meal of operation , Start applying for connection , But we still don't know when the connection will be established , Called when the connection is not established send Will report a mistake , So you can always poll to call send Until no error is reported, it is considered as success ( Real code needs to be timed out ):

while True: try: sock.send(request) break except OSError as e: pass

But it makes CPU Idling is a waste of performance , And I can't do anything else during this period , It's like we've been calling after ordering takeout to ask if the food is ready , It's a waste of telephone charges , If the meal is finished, call us and tell us , Then there is only one cost , Very economical ( This is also the case under normal circumstances ).
At this time, the event cycle is needed , In the class UNIX in , One called select The function of , It can wait for the event to occur before calling the listening function , However, the initial implementation performance is not very good , stay Linux Upper quilt epoll replace , But the interface is similar , Where in the Python These different event loops are encapsulated in selectors In the library , At the same time through DefaultSelector Pick the best class from the system select function .
Let's not talk about the principle of event cycle for the time being , The most important part of the event cycle is the two parts of his name , One is the event , One is the cycle , stay Python in , You can register events in the event loop by :

def demo(): passselector.register(fd, EVENT_WRITE, demo)

This event loop will listen to the corresponding file descriptor fd, When this file descriptor triggers a write event (EVENT_WRITE) when , The event loop will tell us that we can call the registered function demo. However, if you change the above code to run in this way, you will find that , The program seems to end without running , But the program actually runs , It's just that they have completed the registration , Then wait for the developer to receive the event of the event loop and carry out the next operation , So we just need to write the following code at the end of the code :

while True: for key, mask in selector.select(): key.data()

So the program will run all the time , When an event is captured , Would pass for The cycle tells us , among key.data Is the callback function we registered , When the event occurs , Will inform us , We can get the callback function and run , After understanding , We can write our first concurrent program , He realized a simple I/O Reusable little logic , The code and comments are as follows :

import socketfrom selectors import DefaultSelector, EVENT_READ, EVENT_WRITE# Select event loop selector: DefaultSelector = DefaultSelector()# Used to determine whether there are events running running_cnt: int = 0def request(host: str) -> None: """ Simulate the request and print the response body """ # Tell the main function , Their own events are still running global running_cnt running_cnt += 1 # initialization socket url: str = f"http://{host}" sock: socket.SocketType = socket.socket() sock.setblocking(False) try: sock.connect((host, 80)) except BlockingIOError: pass response_bytes: bytes = b"" def read_response() -> None: """ Receive response parameters , And determine whether the request ends """ nonlocal response_bytes chunk: bytes = sock.recv(4096) print(f"recv {host} body success") if chunk: response_bytes += chunk else: # No data means the request is over , Cancellation of listening selector.unregister(sock.fileno()) global running_cnt running_cnt -= 1 def connected() -> None: """socket Callback when establishing connection """ # Cancel monitoring selector.unregister(sock.fileno()) print(f"{host} connect success") # Send a request , And listen for read Events , And register the corresponding receiving response function sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii")) selector.register(sock.fileno(), EVENT_READ, read_response) selector.register(sock.fileno(), EVENT_WRITE, connected)if __name__ == "__main__": # Multiple requests at the same time request("so1n.me") request("github.com") request("google.com") request("baidu.com") # Listen for events running while running_cnt > 0: # Wait for the event loop to notify whether the event has completed for key, mask in selector.select(): key.data()

This code is close to registering 4 A request and register to establish a connection callback , Then enter the event loop logic , That is to hand over control to the event cycle , Until the event loop tells the program that it has received socket Notice of establishment , The program cancels the registered callback and sends a request , And register a read event callback , Then give control to the event loop , Only after receiving the response result will it enter the response result processing function, and only after receiving all the response results will it exit the program .

Here is one of my execution results

so1n.me connect success
github.com connect success
google.com connect success
recv google.com body success
recv google.com body success
baidu.com connect success
recv github.com body success
recv github.com body success
recv baidu.com body success
recv baidu.com body success
recv so1n.me body success
recv so1n.me body success

You can see that their execution order is random , Not strictly in accordance with so1n.megithub.comgoogle.combaidu.com Sequential execution , At the same time, they execute quickly , This program takes about the same time as the function with the longest response time .
But you can see that , There are two callbacks in this program , Callbacks can make the code very strange , Reduce readability , It's also easy to cause hell , And when the callback reports an error , It's hard for us to know what caused the mistake , Because its context is lost , It's very confusing to check the problem like this . As a programmer , Generally, they are not satisfied with fast code , What I really want is to be fast , It's like Sync As simple as your code , High readability , Code that can also easily troubleshoot problems , The design pattern of this combined form of code is called coprocessing .

The synergy appeared very early , It's not like a thread , Scheduled by the system , It's an autonomous pause , And wait for the event cycle notification to resume . Because the collaborative process is implemented at the software level , So there are many ways to implement it , What we want to talk about here is the co process based on Generator , Because generators are the same as coroutines , There are ways to suspend concessions and resume ( You can also use throw To throw the wrong ), At the same time, it's with Async The grammatical process is very similar , By understanding generator based coroutines , You can learn Async How is the collaboration process realized .

3. The generator based process 3.1 generator

Before understanding generator based coroutines , You need to understand the generator first , Python The generator function of is different from the ordinary function , Only ordinary functions with keywords yield, So it's the generator function , What's different can be seen from their bytecode :

In [1]: import dis# Ordinary function In [2]: def aaa(): passIn [3]: dis.dis(aaa) 1 0 LOAD_CONST 0 (None) 2 RETURN_VALUE# Ordinary function call function In [4]: def bbb(): ...: aaa() ...:In [5]: dis.dis(bbb) 2 0 LOAD_GLOBAL 0 (aaa) 2 CALL_FUNCTION 0 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE# General generator function In [6]: def ccc(): yieldIn [7]: dis.dis(ccc) 1 0 LOAD_CONST 0 (None) 2 YIELD_VALUE 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE

The above are ordinary functions , Bytecode of ordinary function calling function and ordinary generator function , It can be seen from the bytecode , The simplest function only needs LOAD_CONST To load variables None Press into your own stack , And then through RETURN_VALUE Return value , Ordinary functions with function calls load variables first , Function of global variable aaa Load into your own stack , And then through CALL_FUNCTION To call a function , Finally through POP_TOP Throw the return value of the function out of the stack , Then pass LOAD_CONST hold None Press into your own stack , Finally, the return value .
Generator functions are different , It will go through first LOAD_CONST To load variables None Press into your own stack , And then through YIELD_VALUE Return value , Then passed POP_TOP Pop up the stack and reset the variable None Press into your own stack , Finally through RETURN_VALUE Return value . From the analysis of bytecode, we can clearly see , The generator can be used in yield Distinguish between two stack frames , A function call can be divided into multiple returns , It is very consistent with the characteristics of many times waiting for the process .

Next, let's look at a use of the generator , This generator will have two yield call , And return the string at the end 'None', The code is as follows :

In [8]: def demo(): ...: a = 1 ...: b = 2 ...: print('aaa', locals()) ...: yield 1 ...: print('bbb', locals()) ...: yield 2 ...: return 'None' ...:In [9]: demo_gen = demo()In [10]: demo_gen.send(None)aaa {'a': 1, 'b': 2}Out[10]: 1In [11]: demo_gen.send(None)bbb {'a': 1, 'b': 2}Out[11]: 2In [12]: demo_gen.send(None)---------------------------------------------------------------------------StopIteration Traceback (most recent call last)<ipython-input-12-8f8cb075d6af> in <module>----> 1 demo_gen.send(None)StopIteration: None

This code first generates a through a function call demo_gen Generator object for , And then for the first time send Return value when calling 1, The second time send Return value when calling 2, third time send Call throws StopIteration abnormal , The exception prompt is None, At the same time, you can see the first printing aaa And the second print bbb when , They can all print to the local variables of the current function , It can be found that even in different stack frames , They read that the local variables in the current local function are consistent , This means that if you use a generator to simulate a coroutine , It will still read to the current context , It's perfect .

Besides , Python It also supports the adoption of yield from Syntax to return a generator , The code is as follows :

In [1]: def demo_gen_1(): ...: for i in range(3): ...: yield i ...:In [2]: def demo_gen_2(): ...: yield from demo_gen_1() ...:In [3]: demo_gen_obj = demo_gen_2()In [4]: demo_gen_obj.send(None)Out[4]: 0In [5]: demo_gen_obj.send(None)Out[5]: 1In [6]: demo_gen_obj.send(None)Out[6]: 2In [7]: demo_gen_obj.send(None)---------------------------------------------------------------------------StopIteration Traceback (most recent call last)<ipython-input-7-f9922a2f64c9> in <module>----> 1 demo_gen_obj.send(None)StopIteration:

adopt yield from You can easily support generator calls , If you treat each generator function as a coroutine , That passed yield from It is very convenient to realize the call between coroutines , In addition, the reminder after the generator throws an exception is very user-friendly , Also support throw To throw an exception , In this way, we can set exceptions when the collaboration runs , such as Cancel, The demo code is as follows :

In [1]: def demo_exc(): ...: yield 1 ...: raise RuntimeError() ...:In [2]: def demo_exc_1(): ...: for i in range(3): ...: yield i ...:In [3]: demo_exc_gen = demo_exc()In [4]: demo_exc_gen.send(None)Out[4]: 1In [5]: demo_exc_gen.send(None)---------------------------------------------------------------------------RuntimeError Traceback (most recent call last)<ipython-input-5-09fbb75fdf7d> in <module>----> 1 demo_exc_gen.send(None)<ipython-input-1-69afbc1f9c19> in demo_exc() 1 def demo_exc(): 2 yield 1----> 3 raise RuntimeError() 4 RuntimeError: In [6]: demo_exc_gen_1 = demo_exc_1()In [7]: demo_exc_gen_1.send(None) Out[7]: 0In [8]: demo_exc_gen_1.send(None) Out[8]: 1In [9]: demo_exc_gen_1.throw(RuntimeError) ---------------------------------------------------------------------------RuntimeError Traceback (most recent call last)<ipython-input-9-1a1cc55d71f4> in <module>----> 1 demo_exc_gen_1.throw(RuntimeError)<ipython-input-2-2617b2366dce> in demo_exc_1() 1 def demo_exc_1(): 2 for i in range(3):----> 3 yield i 4 RuntimeError:

You can see that when an exception is thrown during operation , There will be a very clear mistake , It is obvious that the error stack , meanwhile throw After specifying the exception , Will be next yield Throw an exception ( So the coroutine calls Cancel It won't be cancelled immediately , But it will be cancelled the next time ).

3.2 Using generator to realize collaborative process

We have simply learned that the generator is a programming model that fits the collaborative process very well , Also know which generators API That's what we need API, Next, you can imitate Asyncio Interface to implement a simple collaborative process .

The first is Asyncio There is a package called Feature, It is used to indicate that the cooperation process is waiting for the results in the future , Here is my basis asyncio.Feature A simple package Feature, its API No, asyncio.Feature whole , The code and comments are as follows :

class Status: """ Used to judge Future state """ pending: int = 1 finished: int = 2 cancelled: int = 3class Future(object): def __init__(self) -> None: """ On initialization , Feature Handle pending state , wait for set result""" self.status: int = Status.pending self._result: Any = None self._exception: Optional[Exception] = None self._callbacks: List[Callable[['Future'], None]] = [] def add_done_callback(self, fn: [['Future'], None]Callable) -> None: """ Add callback on completion """ self._callbacks.append(fn) def cancel(self): """ Cancel current Feature""" if self.status != Status.pending: return False self.status = Status.cancelled for fn in self._callbacks: fn(self) return True def set_exception(self, exc: Exception) -> None: """ Set the abnormal """ if self.status != Status.pending: raise RuntimeError("Can not set exc") self._exception = exc self.status = Status.finished def set_result(self, result: Any) -> None: """ Set result """ if self.status != Status.pending: raise RuntimeError("Can not set result") self.status = Status.finished self._result = result for fn in self._callbacks: fn(self) def result(self): """ To get the results """ if self.status == Status.cancelled: raise asyncio.CancelledError elif self.status != Status.finished: raise RuntimeError("Result is not read") elif self._exception is not None: raise self._exception return self._result def __iter__(self): """ Simulate the co process through the generator , When receiving the result notification , Will return results """ if self.status == Status.pending: yield self return self.result()

Understanding Future when , You can think of it as a state machine , When starting initialization, it is peding state , When running, we can switch its state , And through __iter__ Method to support the caller to use yield from Future() To wait for Future In itself , Until the event notification is received , We can get the result .

But you can find this Future Is not self driven , Called __iter__ Your program doesn't know when it was called set_result, stay Asyncio Through a call Task Class to drive Future, It arranges the execution process of a collaborative process , And be responsible for executing the coordination process in the event cycle . It has two main ways :

1. On initialization , Will pass first send Method activation generator

2. The next waiting time will be arranged immediately after being dispatched , Unless you throw StopIteration abnormal

There is also a method that supports canceling the running of managed processes ( In the original code , Task It is inherited from Future, therefore Future Some of it has ), The simplified code is as follows :

class Task: def __init__(self, coro: Generator) -> None: # Initialization status self.cancelled: bool = False self.coro: Generator = coro # Pre excitation an ordinary future f: Future = Future() f.set_result(None) self.step(f) def cancel(self) -> None: """ Used to unmanage coro""" self.coro.throw(asyncio.CancelledError) def step(self, f: Future) -> None: """ Used to invoke coro Next step , From the first activation , Add callback at completion every time , Until cancellation or StopIteration abnormal """ try: _future = self.coro.send(f.result()) except asyncio.CancelledError: self.cancelled = True return except StopIteration: return _future.add_done_callback(self.step)

such Future and Task It's encapsulated , You can simply try the effect :

In [2]:def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]: ...: result = yield from f ...: print(flag_int, result) ...: ...:future: Future = Future() ...:for i in range(3): ...: coro = wait_future(future, i) ...: # trusteeship wait_future This process , Inside Future Also through yield from Be hosted ...: Task(coro) ...: ...:print('ready') ...:future.set_result('ok') ...: ...:future = Future() ...:Task(wait_future(future, 3)).cancel() ...: ready0 ok1 ok2 ok---------------------------------------------------------------------------CancelledError Traceback (most recent call last)<ipython-input-2-2d1b04db2604> in <module> 12 13 future = Future()---> 14 Task(wait_future(future, 3)).cancel()<ipython-input-1-ec3831082a88> in cancel(self) 81 82 def cancel(self) -> None:---> 83 self.coro.throw(asyncio.CancelledError) 84 85 def step(self, f: Future) -> None:<ipython-input-2-2d1b04db2604> in wait_future(f, flag_int) 1 def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]:----> 2 result = yield from f 3 print(flag_int, result) 4 5 future: Future = Future()<ipython-input-1-ec3831082a88> in __iter__(self) 68 """ Simulate the co process through the generator , When receiving the result notification , Will return results """ 69 if self.status == Status.pending:---> 70 yield self 71 return self.result() 72 CancelledError:

This program will initialize Future, And put Future Pass to wait_future And generate a generator , Then hand it over to Task trusteeship , Pre excitation , because Future Is in the generator function wait_future Pass through yield from Function bound , What is really pre stimulated is Future Of __iter__ Methods yield self, At this point, the code logic will pause in yield self And back to .
After all pre excitation , By calling Future Of set_result Method , send Future Change to the end state , because set_result The registered callback will be executed , At this point, it will execute the Task Of step Methods send Method , The code logic goes back to Future Of __iter__ Methods yield self, And keep going , And then meet return Return results , And keep going , From the output, it can be found that the program is encapsulated and printed ready after , The corresponding return results will be printed in turn , And in the last test cancel Method ,Future Exception thrown , At the same time, these anomalies are easy to understand , Be able to follow where you call .

Now? Future and Task Up and running , It can be integrated with the program we executed at the beginning , The code is as follows :

class HttpRequest(object): def __init__(self, host: str): """ Initialize variables and sock""" self._host: str = host global running_cnt running_cnt += 1 self.url: str = f"http://{host}" self.sock: socket.SocketType = socket.socket() self.sock.setblocking(False) try: self.sock.connect((host, 80)) except BlockingIOError: pass def read(self) -> Generator[Future, None, bytes]: """ from socket Get response data , and set To Future in , And pass Future.__iter__ Method or get the data and pass it through the variable chunk_future return """ f: Future = Future() selector.register(self.sock.fileno(), EVENT_READ, lambda: f.set_result(self.sock.recv(4096))) chunk_future = yield from f selector.unregister(self.sock.fileno()) return chunk_future # type: ignore def read_response(self) -> Generator[Future, None, bytes]: """ Receive response parameters , And determine whether the request ends """ response_bytes: bytes = b"" chunk = yield from self.read() while chunk: response_bytes += chunk chunk = yield from self.read() return response_bytes def connected(self) -> Generator[Future, None, None]: """socket Callback when establishing connection """ # Cancel monitoring f: Future = Future() selector.register(self.sock.fileno(), EVENT_WRITE, lambda: f.set_result(None)) yield f selector.unregister(self.sock.fileno()) print(f"{self._host} connect success") def request(self) -> Generator[Future, None, None]: # Send a request , And listen for read Events , And register the corresponding receiving response function yield from self.connected() self.sock.send(f"GET {self.url} HTTP/1.0\r\nHost: {self._host}\r\n\r\n".encode("ascii")) response = yield from self.read_response() print(f"request {self._host} success, length:{len(response)}") global running_cnt running_cnt -= 1if __name__ == "__main__": # Multiple requests at the same time Task(HttpRequest("so1n.me").request()) Task(HttpRequest("github.com").request()) Task(HttpRequest("google.com").request()) Task(HttpRequest("baidu.com").request()) # Listen for events running while running_cnt > 0: # Wait for the event loop to notify whether the event has completed for key, mask in selector.select(): key.data()

This code passes Future And generator methods try to decouple callback functions , If you ignore HttpRequest Medium connected and read Method, you can find that the whole code is basically the same as the synchronized code , Only by yield and yield from Relinquishing control and restoring control through a cycle of events . At the same time, through the above exception examples, it can be found that exception troubleshooting is very convenient , In this way, there are no bad things about callback , Developers only need to develop according to the idea of synchronization , But our event loop is a very simple example of an event loop , At the same time socket The relevant are not encapsulated , Some commonly used are also missing API, And these will be Python Officially sealed to Asyncio In this library , Through the library , We can write almost perfectly Async Syntax code .

NOTE: Because the generator cannot pass yield from Syntax usage generator , therefore Python stay 3.5 Then I used Await The original synergy of .

This is about Python in Async This is the end of the article on the implementation of syntax synergy , More about Python Please search the previous articles of SDN or continue to browse the relevant articles below. I hope you can support SDN in the future !



  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved