程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Understand the yield in Python, the role of yield in the coroutine, and implement a simple event loop

編輯:Python

Future and Task object

import uuid
class Future:
def __init__(self, loop):
self._result = None
self._done = False
self._callbacks = []
self._loop = loop
# to _result Attribute assignment ,_result The value of ends the data returned by the time-consuming operation 
def set_result(self, data):
if self._done:
raise RuntimeError("Future Object cannot set value repeatedly ")
self._done = True
self._result = data
if isinstance(data, Future):
self._result = data._result
for callback in self._callbacks:
self._loop.add_ready_task(callback)
# obtain future Object result value 
def result(self):
if self._done:
return self._result
raise RuntimeError("Future object The value result is not ready ")
# await wait for 
def __await__(self):
# yield Its role in asynchronous collaboration is : When executing to calling the system to initiate io After the operation , Pause function execution ,
# Will the current future Object returns , And give up the right of execution 
yield self
return self._result
# Add callback event , When set_result When the method is called , Put the callback object of the coroutine into the event loop for execution 
def add_done_callback(self, callback, *args):
self._callbacks.append(callback)
class Task(Future):
def __init__(self, core, loop):
super(Task, self).__init__(loop)
# core It's a collaborative task 
self.core = core
self._task_id = uuid.uuid4()
self._future = None
# run Method is equivalent to starter , Start the co process task function ,io Time consuming operations must be consistent with future Object to associate , When executed await future When the object 
# await Trigger future Object __await__ Method ,yield Pause function execution , And return to the current future object ,
# t = self.core.send(Node) end of execution , here future Is to perform io Operation of the Future object 
def run(self):
try:
print(f"{
self._task_id} Mission Start execution ")
future = self.core.send(None)
except StopIteration:
self.set_result(self._future)
print(f"{
self._task_id} Cooperation task end of execution ")
print("-" * 50)
# When self.core for the first time send There will be no error when , And will be implemented io In operation future Object returns ,
# future Execution in object io The place of operation is exchanged with the system , When io After the operation is completed, it will call future Object set_result Method ,
# set_result Method take io Result linked to future Properties of the , And put the callback function back into the event loop for execution 
else:
print(f"{
self._task_id} Mission Execute to io The long-running , Give up the executive power , Set up io Callback notice ")
print("-" * 50)
future.add_done_callback(self)
self._future = future

EventLoop Event loop object

import collections
import heapq
import time
from random import random, randint
from threading import Thread
from async_future_task import Future, Task
class EventLoop:
loop = None
# Single case , There can only be one event loop 
def __new__(cls, *args, **kwargs):
if not cls.loop:
cls.loop = super().__new__(cls)
return cls.loop
def __init__(self):
# The task queue is ready to run 
self._ready_que = collections.deque()
# Deferred task list 
self._scheduled = []
self.stop = False
# Create a collaboration task object , And added to the executable queue 
def create_task(self, core, *args):
task = Task(core, self)
self._ready_que.append(task)
return task
# Add tasks to the delayed task queue 
def add_delay_task(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args))
# Add executable tasks to the task queue , This function is mainly for future Object to add callback tasks 
def add_ready_task(self, task, *args):
self._ready_que.append(task)
def run_forever(self):
while True:
self.exec_task()
if self.stop and len(self._scheduled) == 0 and len(self._ready_que) == 0:
break
def stop_exit(self):
self.stop = True
# Perform tasks 
def exec_task(self):
t = time.time()
len_scheduled = len(self._scheduled)
for i in range(len_scheduled):
task = heapq.heappop(self._scheduled)
if task[0] <= t:
self._ready_que.append((task[1], task[2]))
else:
heapq.heappush(self._scheduled, task)
break
len_ready = len(self._ready_que)
for i in range(len_ready):
task = self._ready_que.popleft()
# If it is task yes Task Object, then execute run Method 
if isinstance(task, Task):
task.run()
# If not Task Object words Just put task Execute as a function 
else:
task[0](*task[1])
# This is the user layer , Users only need await The asynchronous method of the framework is ok ,
# It doesn't need how the bottom of the relationship framework is implemented 
async def get_baidu():
# Calling fake_io Wait for future object , It's going to trigger future Object __await__ Method , Again because __await__
# Methods include yield , It pauses the execution of the function , return future Self object 
data = await aiohttp_request_url()
print(" Asynchronous task end , io The value obtained by the operation is : ", data)
return data
# aiohttp_request_url The simulation is asynchronous http request ,
# This method simulates the framework encapsulated 、 Execute the call system io Steps for 
async def aiohttp_request_url():
# establish future Wait for the person 
future = Future(loop)
# perform io The long-running , Don't wait at this time , Just call , Don't wait for , Trusteeship time-consuming operations to the system ,
# The system has finished executing io The long-running , Automatic callback future set_result Method , fake_io The simulation calls the system to initiate io operation , The system automatically calls back the result 
fake_io(future)
data = await future
# Can be in await Get data Or do some data processing 
return data
def fake_io(future):
def sleep():
global task_run_time
# Random sleep 0-1 second 
task_time = random()
task_run_time += task_time
time.sleep(task_time)
# io Time consuming operation execution completed , Simulate system callback set_result Method , to future Object to set random values 
data = randint(1, 10)
future.set_result(data)
Thread(target=sleep).start()
loop = EventLoop()
start_time = time.time()
task_run_time = 0
for _ in range(1000):
loop.create_task(get_baidu())
loop.add_delay_task(2, loop.stop_exit)
loop.run_forever()
print(f" Execution time of all tasks :{
task_run_time}, Actual execution time {
time.time() - start_time}")

Understanding chart


Reference resources b Station boss DavyCloud asyncio Series of tutorials


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved