程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Relationships and functions of coroutines, tasks, and future waiting objects in Python asyncio

編輯:Python

Catalog

Preface

1.Asyncio Entrance

2. Two kinds of Coroutine The difference between calling methods

3.Task And Future

3.1.Future

3.2.Task

4. summary

Preface

Last article 《Python in Async The implementation of syntax coroutine 》 It introduces Python How to implement the collaborative process with a generator and Python Asyncio adopt Future and Task To realize the scheduling of collaborative process , And in the Python Asyncio In Coroutines, Tasks and Future All belong to waiting objects , In use Asyncio In the process of , It often involves the transformation and scheduling of the three , Developers tend to be confused about concepts and functions , This paper mainly expounds the relationship between the three and their functions .

1.Asyncio Entrance

A coroutine is a special case of a thread , The entry and switching of the coordination process are scheduled by the event loop , In the new edition of Python The entrance to the middle process is Asyncio.run, When the program runs to Asyncio.run after , It can be simply understood that the program switches from thread mode to co process mode ( It's just easy to understand , For computers , There is no such distinction ),

The following is an example code of the smallest co process :

import asyncioasync def main(): await asyncio.sleep(0)asyncio.run(main())

In this code ,main Functions and asyncio.sleep All belong to Coroutine,main It's through asyncio.run Calling , Next, the program also enters a co process mode ,asyncio.run The core call is Runner.run, The code is as follows :

class Runner: ... def run(self, coro, *, context=None): """Run a coroutine inside the embedded event loop.""" # Omit code ... # hold coroutine To task task = self._loop.create_task(coro, context=context) # Omit code ... try: # If the incoming is Future perhaps coroutine, It will also be designed for task return self._loop.run_until_complete(task) except exceptions.CancelledError: # Omit code ...

Some other functions and initialization codes have been deleted from this code , You can see that the main function of this function is through loop.create_task How to put a Coroutine Object into a Task object , And then through loop.run_until_complete Wait for this Task End of run .

You can see ,Asycnio It will not be dispatched directly Coroutine, But turn it into Task Then schedule , This is because in the Asyncio The smallest scheduling object in the event loop is Task. But in the Asyncio Not all of them Coroutine All calls will be converted to Task The object waits , For example, the asyncio.sleep, Because it's in main Direct in function await Of , So it won't be converted , But just wait , The figure shown by calling the tool analysis is as follows :

In this diagram , from main Function to asyncio.sleep There is no obvious in the function loop.create_task Wait Coroutine To Task call , The reason why conversion is not needed here is not that some special optimizations have been made , But this is why , This await asyncio.sleep The function will actually be main This Coroutine Converted Task Continue scheduling until .

2. Two kinds of Coroutine The difference between calling methods

In understanding Task Before the scheduling principle of , Let's go back to the original call example , Look, just use Task Call and use directly Coroutine What is the difference between calls .

The following code , We show the execution of a Coroutine To Task Wait for the operation of , Then the code will be as follows :

import asyncioasync def main(): await asyncio.create_task(asyncio.sleep(0))asyncio.run(main())

Such code looks like the original call example , Do not have what difference , But if you make some changes , For example, add some sleep time and Coroutine Call to , Can see Task The role of the object , Now prepare two documents ,

Their code is as follows :

# demo_coro.pyimport asyncioimport timeasync def main(): await asyncio.sleep(1) await asyncio.sleep(2)s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 3.0028765201568604# demo_task.pyimport asyncioimport timeasync def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 2.0027475357055664

among demo_coro.py Twice await call , The total running time of the program is 3 second , and demo_task.py Is to put two Coroutine Object to Task object , Then do it twice await call , The total running time of the program is 2 second . You can find ,demo_task.py The run time of is similar to the longest running Task Object duration , and demo_coro.py The run time of is approximately two hours Coroutine Total runtime of the object .

The reason why this is the result , Because of the direct awaitCoroutine Object time , This program will wait , until Coroutine After the object is executed, continue to go down , and Task The difference between objects is that at the moment of creation , You have registered yourself in the event loop and wait to be scheduled to run , And then return a task Object for developers to wait , because asyncio.sleep It's pure. IO Type of call , So in this program , Two asyncio.sleepCoroutine Be converted to Task Thus, concurrent call is realized .

3.Task And Future

The reason why the above code passed Task Can realize concurrent call , Because Task There are some functions that interact with the event loop , It is these functions that set up Coroutine The possibility of concurrent calls , however Task yes Future A sub object of , So in understanding Task Before , You need to know Future.

3.1.Future

And Coroutine The only difference between concession and acceptance is Future In addition to the functions of concession and receiving results , It is also a container with state that can only passively make event calls , It is initialized Pending state , It can then be cancelled , Set result and set exception . After the corresponding operation is set ,Future Will be transformed into an irreversible corresponding state , And pass loop.call_sonn To call all callback functions registered on itself , At the same time, it has __iter__ and __await__ Method so that it can be await and yield from call , Its main code is as follows :

class Future: ... def set_result(self, result): """ Set result , And schedule the next call """ if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() def set_exception(self, exception): """ Set the abnormal , And schedule the next call """ if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: raise TypeError("StopIteration interacts badly with generators " "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED self.__schedule_callbacks() self.__log_traceback = True def __await__(self): """ Set to blocking, And accept await perhaps yield from call """ if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too. __iter__ = __await__ # make compatible with 'yield from'.

Looking at this code alone, it is difficult to understand why the following future Called set_result Then you can continue to go down :

async def demo(future: asyncio.Future): await future print("aha")

This is because Future Follow Coroutine equally , No ability of active scheduling , Only through Task Be scheduled in conjunction with the event loop .

3.2.Task

Task yes Future Subclasses of , Except for inheritance Future All the ways , It also adds two important methods __step and __wakeup, Through these two methods Task Dispatch capability , This is a Coroutine and Future There is no the ,Task The main codes related to scheduling are as follows ( See note... For instructions ):

class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. _log_destroy_pending = True def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) # Omit some initialization code ... # managed coroutine self._coro = coro if context is None: self._context = contextvars.copy_context() else: self._context = context # adopt loop.call_sonn, stay Task Immediately after initialization, notify the event loop to execute its own... When it is free next time __step function self._loop.call_soon(self.__step, context=self._context) def __step(self, exc=None): coro = self._coro # convenient asyncio introspection _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # adopt send Pre excitation managed coroutine # At this time, you will only get coroutine yield Return the data or receive a StopIteration It's abnormal # about Future perhaps Task The return is Self result = coro.send(None) else: # Send exception to coroutine result = coro.throw(exc) except StopIteration as exc: # StopIteration representative Coroutine Operation completed if self._must_cancel: # coroutine Cancelled before stopping , You need to perform the cancel operation self._must_cancel = False super().cancel(msg=self._cancel_message) else: # Send the running value to the result value super().set_result(exc.value) # Omit other exception encapsulation ... else: # If no exception is thrown blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # adopt Future The code can judge , If a _asyncio_future_blocking attribute , It represents the present result yes Future Or is it Task # It means this Task It's wrapped in another one Future perhaps Task # Omit Future Judge ... if blocking: # Represents this Future perhaps Task In a stuck state , # At this time Task Gave up control of the event cycle , Wait for this stuck Future perhaps Task Wake yourself up when the execution is complete result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(msg=self._cancel_message): self._must_cancel = False else: # Can not be await two new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # Relinquished control of the event loop , Managed on their behalf coroutine There might be a coroutine Running , The next step is to give control to him and the cycle of events # Current coroutine Even if there is no Future perhaps Task, But son Future There may be self._loop.call_soon(self.__step, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # Other Task and Future This function will be called after completion , Next, do some processing try: # Recycling Future The state of , If Future Something is wrong , Then pass the exception back to yourself future.result() except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: # Task You don't need your own trusteeship Future The result is worth , And the following notes , This makes scheduling faster # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() self = None # Needed to break cycles when an exception occurs.

This source code is Task Object __setp The method is longer , the After streamlining, it can be found that he mainly does three jobs :

1. adopt send perhaps throw To drive Coroutine Go to the next step

2. By giving it to people who are in their own custody Future perhaps Task Add a callback to get notification of completion and regain control

3. adopt loop.call_soon To make concessions , Give control to the event loop

Source code analysis alone can be difficult to understand , Here's how Two kinds of Coroutine As an example , A simple statement Task The process of cyclic scheduling with Events , First of all demo_coro, There is only one in this example Task:

# demo_coro.pyimport asyncioimport timeasync def main(): await asyncio.sleep(1) await asyncio.sleep(2)s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 3.0028765201568604

The first step in this example is to main Turn to one Task, And then called the corresponding. __step Method , Now __step Method will call main() This Coroutine Of send(None) Method .
Then the logic of the whole program will go directly to main Function await asyncio.sleep(1) This Coroutine in ,await asyncio.sleep(1) Mr. Hui becomes a Future object , And pass loop.call_at Tell the event loop in 1 Activate this... In seconds Future object , Then return the object to . Then the logic will return to Task Of __step In the method ,__step Find out send Call to get a Future object , So right here Future Add a callback , Give Way Future Activate yourself when you're done , Then give up control of the event cycle . Then the event loop activates this one second later Future object , Then the program logic will execute to Future The callback , That is to say Task Of __wakeup Method , therefore Task Of __step Called again to , And this time I met the latter await asyncio.sleep(2), So I went through the above process again . When two asyncio.sleep After execution is complete ,Task Of __step The method is right Coroutine Send a send(None) Then I caught StopIteration abnormal , Now Task Would pass set_result Set result , And end your scheduling process .

You can see demo_core.py Only one of them Task In charge of scheduling with the event loop , The beginning of the event cycle must be a Task, And pass Task To tune up a Coroutine, adopt __step Method put the follow-up Future,Task,Coroutine All operate as a chain , and demo_task.py It's different , It has two Task, The code is as follows :

# demo_task.pyimport asyncioimport timeasync def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 2.0027475357055664

The first step in this example is to follow demo_coro equally , But jump to main After the function, there is a difference , First, in this function task1 and task2 Two Task, They will all pass separately __step Methods send Activate the corresponding asyncio.sleepCoroutine, Then wait for the corresponding Future To inform yourself that you have completed . And for creating these two Task Of main Task Come on , adopt main Functional awati task_1 and await task_2 To get their “ control power “. The first is through await task_1 sentence ,main Task Medium __step Method is calling send After that, I got task_1 Corresponding Future, At this time, you can work for this Future Add a callback , Let him inform himself when he finishes , Take the next step by yourself , about task_2 So it is with . Until the last two task All completed ,main Task And caught StopIteration abnormal , adopt set_result Set result , And end your scheduling process .

You can see demo_task.py And demo_coro.py The obvious difference is main Task Two... Are created in the life cycle of the run Task, And pass await Hosted two Task, Two at the same time Task It can also realize the concurrency of two collaborative processes , So it can be found that during the operation of the event loop , The concurrency of the current process is always less than that registered in the event loop Task Number . Besides , If in main Task If there is no explicit await, Then Task Will escape , Not subject to main Task management , as follows :

# demo_task.pyimport asyncioimport timedef mutli_task(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2))async def main(): mutli_task() await asyncio.sleep(1.5) s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 1.5027475357055664

In this code ,main Task In carrying out the mutli_task when , Will create two task, But in __step Medium coro.send(None) The result of the call is await asyncio.sleep(1.5) Back to Future, therefore main Task You can only call this Future Of add_don_callback To load your own __wakeup Method , Eventually lead to main Task Can only be hosted to await asyncio.sleep(1.5) Of Future, and mutli_task Created task Then he escaped , Become the vertex of another chain Task.

However, the event loop of this program is only managed to main Task So the event loop will always run , until main Task Exit only at the end of the run , Then the program will exit together , So the running time of the program is only 1.5 About seconds .
In addition, due to other Task It is also registered to the event loop , So the event loop will help put task_1 completion of enforcement , and task_2 The sleep time defined is 2 second , Before the program exits, the event loop will find a Task Not yet completed , So I'll be on this Task Print an alarm and clean it up .

4. summary

In depth Task,Future After understanding the source code of , I understand Task and Future stay Asyncio The role of , It was also found that Task and Future All follow loop There is a certain coupling , and loop You can also create Task and Future, So if you want to really understand Asyncio The scheduling principle of , One more step is needed , adopt Asyncio To understand the whole Asyncio The design of the .

This is about Python Asyncio in Coroutines,Tasks,Future This is the end of the article on the relationship and role of waiting objects , More about Python Asyncio Please search the previous articles of software development network or continue to browse the relevant articles below. I hope you will support software development network more in the future !



  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved