程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Pythonasyncio scheduling philosophy details

編輯:Python

Catalog

Preface

1. Basic introduction

2.EventLoop Scheduling implementation of

3. The Internet IO The handling of events

Preface

In the article 《Python Asyncio in Coroutines,Tasks,Future Relationship and function of waiting objects 》 This paper introduces the in Python The waiting object of , especially Task Objects can be self - driven at startup , But one Task Object can only drive one execution chain , If you want multiple chains to execute ( Concurrent ), Still need EventLoop To schedule the drive , The next step will be through Python.Asyncio Library source code to understand EventLoop How it works .

1. Basic introduction

Python.Asyncio It is a large and comprehensive library , It includes many functions , The logic related to the core scheduling includes three kinds of waiting objects , There are other functions , They are located in runners.py,base_event.py,event.py In three files .

runners.py File has a main class --Runner, Its main responsibility is to do a good job in initializing the event cycle that enters the collaborative process mode , And cleaning up the in memory processes when exiting the process mode , Generators and other objects .

The co process model is just for easy understanding , For computers , There is no such distinction

event.py The files are stored in addition to EventLoop Object interface and get and set EventLoop Outside the function of , There are two EventLoop Schedulable objects , Respectively Handler and TimerHandler, They can be thought of as EvnetLoop Calling containers for other objects , Used to connect the relationship between the object to be scheduled and the event loop , But their implementation is very simple , about Handler, Its source code is as follows :

# Some unwanted code has been removed class Handle: def __init__(self, callback, args, loop, context=None): # Initialization context , Make sure you can find Handle In the context of if context is None: context = contextvars.copy_context() self._context = context self._loop = loop self._callback = callback self._args = args self._cancelled = False def cancel(self): # Set up current Handle To cancel the status if not self._cancelled: self._cancelled = True self._callback = None self._args = None def cancelled(self): return self._cancelled def _run(self): # Used to execute real functions , And through context.run Method to ensure that... Is executed in its own context . try: # Keep executing the corresponding callback in your own context self._context.run(self._callback, *self._args) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: cb = format_helpers._format_callback_source( self._callback, self._args) msg = f'Exception in callback {cb}' context = { 'message': msg, 'exception': exc, 'handle': self, } self._loop.call_exception_handler(context)

Through the source code can be found ,Handle The function is very simple , Provides functions that can be canceled and executed in their own context , and TimerHandle Inherited from Handle Than Handle There are some more parameters related to time and sorting , Source code is as follows :

class TimerHandle(Handle): def __init__(self, when, callback, args, loop, context=None): super().__init__(callback, args, loop, context) self._when = when self._scheduled = False def __hash__(self): return hash(self._when) def __lt__(self, other): if isinstance(other, TimerHandle): return self._when < other._when return NotImplemented def __le__(self, other): if isinstance(other, TimerHandle): return self._when < other._when or self.__eq__(other) return NotImplemented def __gt__(self, other): if isinstance(other, TimerHandle): return self._when > other._when return NotImplemented def __ge__(self, other): if isinstance(other, TimerHandle): return self._when > other._when or self.__eq__(other) return NotImplemented def __eq__(self, other): if isinstance(other, TimerHandle): return (self._when == other._when and self._callback == other._callback and self._args == other._args and self._cancelled == other._cancelled) return NotImplemented def cancel(self): if not self._cancelled: # Used to notify the event loop of the current Handle It's exited self._loop._timer_handle_cancelled(self) super().cancel() def when(self): return self._when

It can be found through the code , These two objects are very simple , And we're using Python.Asyncio These two objects are not directly used by , But through loop.call_xxx A series of methods to encapsulate the call into Handle object , And then wait EventLoop perform . therefore loop.call_xxx A series of methods can be considered as EventLoop Registration operation of , Basically all non IO All asynchronous operations of the need to pass loop.call_xxx Method to register your calls to EventLoop in , such as Task Object is initialized by calling loop.call_soon Method to register to EventLoop in ,loop.call_sonn Is very simple to implement ,

Its source code is as follows :

class BaseEventLoop: ... def call_soon(self, callback, *args, context=None): # Check whether the event loop is closed , If so, throw an exception directly self._check_closed() handle = self._call_soon(callback, args, context) return handle def _call_soon(self, callback, args, context): # Encapsulate the call into a handle, This makes it easy to be called by the event loop handle = events.Handle(callback, args, self, context) # Add one handle To _ready, Waiting to be called self._ready.append(handle) return handle

You can see call_soon The only really relevant code is 10 A few lines , It is responsible for encapsulating a call into a Handle, To add to self._reday in , So as to register the call into the event loop .

loop.call_xxx A series of functions except loop.call_soon Outside the series of functions , There are two other ways --loop.call_at and loop.call_later, They are similar to loop.call_soon, But there is one more time parameter , To tell EventLoop When can I call , At the same time through loop.call_at and loop.call_later The registered call will pass Python Heap sorting module of headpq Sign up to self._scheduled variable ,

The specific code is as follows :

class BaseEventLoop: ... def call_later(self, delay, callback, *args, context=None): if delay is None: raise TypeError('delay must not be None') timer = self.call_at(self.time() + delay, callback, *args, context=context) return timer def call_at(self, when, callback, *args, context=None): if when is None: raise TypeError("when cannot be None") self._check_closed() # Create a timer handle, Then add to the... Of the event loop _scheduled in , Waiting to be called timer = events.TimerHandle(when, callback, args, self, context) heapq.heappush(self._scheduled, timer) timer._scheduled = True return timer2.EventLoop Scheduling implementation of

In the article 《Python Asyncio in Coroutines,Tasks,Future Relationship and function of waiting objects 》 It has been analyzed in runner Will pass loop.run_until_complete To call mainTask To turn on EventLoop The scheduling , So I'm analyzing EventLoop When scheduling , We should start with loop.run_until_complete Starting with ,

The corresponding source code is as follows :

class BaseEventLoop: def run_until_complete(self, future): ... new_task = not futures.isfuture(future) # hold coroutine convert to task, So the event loop can be scheduled , The minimum scheduling unit of the event cycle is task # It should be noted that the event loop is not registered in the global variable at this time , Therefore, the information that needs to be displayed is passed in , # meanwhile Task Object registration , Have gone through loop.call_soon Register yourself in the event loop , Waiting for dispatch future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False # When it's time to task When finished , It means that the current event loop has lost the scheduling object , Unable to continue scheduling , So you need to close the current event loop , The program will return from CO process mode to thread mode future.add_done_callback(_run_until_complete_cb) try: # The event loop starts running self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def run_forever(self): # Do some initialization work self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() # adopt asyncgen Hook to automatically close asyncgen function , This will remind the user that the generator has not been closed sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: # Set the currently running event to loop to the global variable , So you can get the current event loop at any stage events._set_running_loop(self) while True: # It's the logic of the task self._run_once() if self._stopping: break finally: # Turn off the cycle , And clean up some resources self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)

The source code is not complicated , Its main logic is to put Corotinue Turn to one Task object , And then through Task Called when the object is initialized loop.call_sonn Method to register yourself to EventLoop in , Last pass loop.run_forever The loop code in keeps running , until _stopping Marked as True:

while True: # It's the logic of the task self._run_once() if self._stopping: break

It can be seen that , This code is to ensure that the event loop can always be executed , The automatic cycle ends , The core of real scheduling is _run_once function ,

Its source code is as follows :

class BaseEventLoop: ... def _run_once(self): # self._scheduled It's a list , It only stores TimerHandle sched_count = len(self._scheduled) ############################### # The first stage , Arrangement self._scheduled # ############################### if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # When the number of tasks to be scheduled exceeds 100 And the tasks to be cancelled account for 50% when , To enter this logic # Remove the tasks that need to be cancelled new_scheduled = [] for handle in self._scheduled: if handle._cancelled: # Set up handle Of _cancelled by True, And the handle from _scheduled Remove handle._scheduled = False else: new_scheduled.append(handle) # Rearrange heap heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Need to cancel handle Not much , Will only follow this logic , This place will top the pile handle eject , And marked as non schedulable , But the entire heap is not accessed while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False ################################# # The second stage , Calculate timeout values and wait events IO # ################################# timeout = None # When there are ready to dispatch handle Or when it is shutting down , Don't wait for , It is convenient to dispatch as soon as possible if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. # If there is data in the heap , Through the top of the pile handle Calculate the shortest timeout , But not more than MAXIMUM_SELECT_TIMEOUT, To avoid exceeding system limits when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) # The event loop waits for events , Until there is an event or timeout event_list = self._selector.select(timeout) ################################################## # The third stage , Put what meets the conditions TimeHandle Put in self._ready in # ################################################## # Get the callback of the obtained event , Then fill it to _ready self._process_events(event_list) # Put some in self._scheduled And meet the dispatching conditions handle Put it in _ready in , such as TimerHandle. # end_time For the current time + A unit of time , Guess is to be able to deal with more events during this period end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) ################################################################################ # The fourth stage , Traverse all ready to schedule handle, And through handle Of context To execute handle Corresponding callback # ################################################################################ ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() # If handle Has been cancelled , Then... Is not called if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: # A callback that takes too long , recorded , These need to be optimized by the developers themselves logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.

Through source code analysis , It is clear that the first step in the scheduling logic is to regularize first self._scheduled, In the process of regularization, heap sorting is used , Because heap sorting is very efficient in the scheduling scenario , But there are two kinds of regular code , My guess is that direct traversal is more efficient when there are too many cancellations . In order self._scheduled after , Go to the second step , This step starts waiting for the system event cycle to return the corresponding event , If self._ready There's data in , Just don't wait , Need to go to the next step immediately , So that we can arrange the dispatch as soon as possible . After getting the events from the system event cycle , Then we went to the third step , This step will pass self._process_events Method to handle the corresponding event , The callback corresponding to the event is stored in the self._ready in , Finally, I'll go through self._ready All in Handle And do it one by one ( It can be considered as EventLoop Return control to the corresponding calling logic ), At this point, a complete scheduling logic is over , And enter the next scheduling logic .

3. The Internet IO The handling of events

notes : Due to the limitation of system event cycle , So the document IO Generally, multithreading is used to execute , Specific view :github.com/python/asyn…

Based on the analysis of EventLoop The scheduling implementation ignores self._process_events Concrete implementation logic , because _process_events The way is asyncio.base_event.py In the document BaseEventLoop Class has no concrete implementation , Because of the Internet IO Related events need to be handled by the system's event loop , So the logic related to the system event loop is asyncio.selector_events.py Medium BaseSelectorEventLoop Class .BaseSelectorEventLoop Class encapsulation selector The module interacts with the system event loop , So that the caller doesn't have to think about sock And sock The generated file descriptors are monitored and logged off , Let's say BaseSelectorEventLoop The built-in pipe As an example , analysis BaseSelectorEventLoop How to network IO Event handling .

Before analysis , Let's look at an example , The code is as follows :

import asyncioimport threadingdef task(): print("task")def run_loop_inside_thread(loop): loop.run_forever()loop = asyncio.get_event_loop()threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()loop.call_soon(task)

If you run this example directly , It doesn't output task( But in the IDE Use DEBUG Thread startup will be slower in mode , So it will be output ), Because in calling loop.run_forever after EventLoop Will always be stuck in this logic :

event_list = self._selector.select(timeout)

So call loop.call_soon It doesn't make EventLoop Arrange the dispatch immediately , And if call_soon Switch to call_soon_threadsafe Can output normally , This is because call_soon_threadsafe One more self._write_to_self Call to , Its source code is as follows :

class BaseEventLoop: ... def call_soon_threadsafe(self, callback, *args, context=None): """Like call_soon(), but thread-safe.""" self._check_closed() handle = self._call_soon(callback, args, context) self._write_to_self() return handle

Because this call involves IO dependent , So we need to get to BaseSelectorEventLoop Class view , Next, let's say pipe Related networks IO Operation to analyze EventLoop How to deal with IO The event ( Just demonstrate reader object ,writer Object manipulation and reader similar ),

The corresponding source code is as follows :

class BaseSelectorEventLoop(base_events.BaseEventLoop): ####### # establish # ####### def __init__(self, selector=None): super().__init__() if selector is None: # Get the best selector selector = selectors.DefaultSelector() self._selector = selector # establish pipe self._make_self_pipe() self._transports = weakref.WeakValueDictionary() def _make_self_pipe(self): # establish Pipe Corresponding sock self._ssock, self._csock = socket.socketpair() # Set up sock Non blocking self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 # Blocking server sock Read the callback corresponding to the event self._add_reader(self._ssock.fileno(), self._read_from_self) def _add_reader(self, fd, callback, *args): # Check that the event loop is closed self._check_closed() # The encapsulation callback is handle object handle = events.Handle(callback, args, self, None) try: key = self._selector.get_key(fd) except KeyError: # If there is no event loop registered to the system , Then register self._selector.register(fd, selectors.EVENT_READ, (handle, None)) else: # If you have already registered , Update mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) if reader is not None: reader.cancel() return handle def _read_from_self(self): # Responsible for consumption sock data while True: try: data = self._ssock.recv(4096) if not data: break self._process_self_data(data) except InterruptedError: continue except BlockingIOError: break ####### # Delete # ####### def _close_self_pipe(self): # Cancellation Pipe Corresponding descriptor self._remove_reader(self._ssock.fileno()) # close sock self._ssock.close() self._ssock = None self._csock.close() self._csock = None self._internal_fds -= 1 def _remove_reader(self, fd): # If the event loop has been closed , You don't have to operate if self.is_closed(): return False try: # Query whether the file descriptor is in selector in key = self._selector.get_key(fd) except KeyError: # Returns if it does not exist return False else: # If it exists, it will enter the work of removal mask, (reader, writer) = key.events, key.data # Determine whether there are other events through the event mask mask &= ~selectors.EVENT_READ if not mask: # Remove registered to selector File descriptor for self._selector.unregister(fd) else: # Remove registered to selector File descriptor for , And register new events self._selector.modify(fd, mask, (None, writer)) # If reader Not empty , Then cancel reader if reader is not None: reader.cancel() return True else: return False

You can see from the creation section of the source code ,EventLoop At startup, a pair of established communication will be created sock, And set to non blocking , Then encapsulate the corresponding callback into a Handle Object and register it in the system event loop ( To delete, perform the corresponding reverse operation ), After that, the system event loop will always listen for the corresponding event , That is to say EventLoop The execution logic of will be blocked in the following calls , Wait for the event response :

event_list = self._selector.select(timeout)

At this point, if you execute loop.call_soon_threadsafe, Then it will pass write_to_self Write a little information :

def _write_to_self(self): csock = self._csock if csock is None: return try: csock.send(b'\0') except OSError: if self._debug: logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)

because csock Data is written , So this corresponds to ssock You will receive a read event , The system event loop will return the data after receiving the event notification , then EventLoop The corresponding data will be obtained , And to process_events Methods to deal with ,

Its relevant codes are as follows :

class BaseSelectorEventLoop: def _process_events(self, event_list): for key, mask in event_list: # Get the corresponding data from the callback event ,key.data At the time of registration, he was a Yuanzu , So here we need to unpack Yuanzu fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: # obtain reader handle, If it is marked as cancelled , Remove the corresponding file descriptor if reader._cancelled: self._remove_reader(fileobj) else: # If it is not marked as cancelled , Then arrange to self._ready in self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: # For write objects , The same is true . if writer._cancelled: self._remove_writer(fileobj) else: self._add_callback(writer) def _add_callback(self, handle): # Put the callback handle Add to _ready in assert isinstance(handle, events.Handle), 'A Handle is required here' if handle._cancelled: return assert not isinstance(handle, events.TimerHandle) self._ready.append(handle) def _remove_reader(self, fd): # If the event loop has been closed , You don't have to operate if self.is_closed(): return False try: # Query whether the file descriptor is in selector in key = self._selector.get_key(fd) except KeyError: # Returns if it does not exist return False else: # If it exists, it will enter the work of removal mask, (reader, writer) = key.events, key.data mask &= ~selectors.EVENT_READ if not mask: # Remove registered to selector File descriptor for self._selector.unregister(fd) else: self._selector.modify(fd, mask, (None, writer)) if reader is not None: reader.cancel() return True else: return False

As you can see from the code _process_events The file descriptor corresponding to the event will be processed , And get the corresponding... From the event callback Handle Object added to self._ready in , from EventLoop In the next iteration self._ready And implement .

You can see the network IO The handling of events is not complicated , Because the system event loop has done a lot of work for us , But the user owns the network IO Related operations need a similar operation , This is very tedious , fortunately asyncio The library has been encapsulated for us , We just need to call , A lot of convenience .

This is about Python Asyncio That's all for the detailed article on dispatching principle , More about Python Asyncio Please search the previous articles of software development network or continue to browse the relevant articles below. I hope you will support software development network more in the future !



  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved