程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Days and nights are hot and cold, based on Python3+Tornado6+APScheduler/Celery to create concurrent asynchronous dynamic timing task polling service

編輯:Python

原文轉載自「劉悅的技術博客」https://v3u.cn/a_id_220

Typical landing scenarios for timed tasks are common in various industries,such as payment systems,During the payment process, the order is dropped due to network or other factors、card situation,bill becomes“單邊賬”,This is the case for paying users,Undoubtedly a disaster level experience,明明自己付了錢,扣了款,But the order status has not changed.所以,The payment task process for each order requires an alternative to polling periodically,Once there is a problem with the payment,Regular polling service can discover and correct order status in time.

又比如,之前的一篇Destroy them individually,The construction of large file fragment upload is based onVue.js3.0+Ant-desgin+Tornado6純異步IOEfficient write service,During the transfer task of very large file segments,Once there is a problem with the shard upload or shard merging process,It is possible that the oversized file cannot be completely transferred to the server,Thus, a large amount of system bandwidth resources are wasted,Therefore, a corresponding periodic polling is also required during the execution of each fragmented transmission task“盯”著,Prevent problems in the process.

在實際業務場景中,Timing services basically exist as subsidiary services of the main application,The scheduling time of different scheduled tasks may be different,So if you can cooperate with the main service to concurrently and asynchronously call timed tasks,A single application can support tens of thousands,Even more than 100,000 scheduled tasks,And different tasks can have independent scheduling time,這裡通過Tornado配合APScheduler和Celery,Show different asynchronous timing task calling logic respectively.

APScheduler

APScheduler(advanceded python scheduler)It's an excellent onePython3定時任務框架,It not only supports concurrent asynchronous calls to timed tasks,Scheduled tasks can also be managed dynamically,It also supports the persistence of timed tasks.

首先安裝APScheduler以及Tornado6:

pip3 install apscheduler
pip3 install tornado==6.1

Then import based onTornado的異步APScheduler:

from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler

這裡TornadoSchedulerThe example is thereTornadoevent loop feature,Then declare the asynchronous timed task:

async def task():
print('[APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))

Then initialize the timed task object:

scheduler = None
# 初始化
def init_scheduler():
global scheduler
scheduler = TornadoScheduler()
scheduler.start()
scheduler.add_job(task,"interval",seconds=3,id="job1",args=())
print("定時任務啟動")

After starting here, add a scheduled task,每隔三秒執行一次.

接著main入口啟動服務:

if __name__ == '__main__':
init_scheduler()

系統返回:

C:\Users\liuyue\www\tornado6>python test_scheduler.py
定時任務啟動
[APScheduler][Task]-2022-07-28 22:13:47.792582
[APScheduler][Task]-2022-07-28 22:13:50.783016
[APScheduler][Task]-2022-07-28 22:13:53.783362
[APScheduler][Task]-2022-07-28 22:13:56.775059
[APScheduler][Task]-2022-07-28 22:13:59.779563

隨後創建Tornado控制器視圖:

class SchedulerHandler(RequestHandler):
def get(self):
job_id = self.get_query_argument('job_id', None)
action = self.get_query_argument('action', None)
if job_id:
# 添加任務
if 'add' == action:
if job_id not in job_ids:
job_ids.append(job_id)
scheduler.add_job(task, 'interval', seconds=3, id=job_id, args=(job_id,))
self.write('[TASK ADDED] - {}'.format(job_id))
else:
self.write('[TASK EXISTS] - {}'.format(job_id))
# 刪除任務
elif 'remove' == action:
if job_id in job_ids:
scheduler.remove_job(job_id)
self.write('[TASK REMOVED] - {}'.format(job_id))
else:
self.write('[TASK NOT FOUND] - {}'.format(job_id))
else:
self.write('[INVALID PARAMS] INVALID job_id or action')

Here, asynchronous timing tasks are dynamically deleted by passing parameters,For scheduled tasks that complete polling tasks,It can be physically removed completely,從而節約系統資源,Then add routes and startTornado服務:

if __name__ == '__main__':
routes = [url(r"/scheduler/",SchedulerHandler)]
init_scheduler()
# 聲明tornado對象
application = Application(routes,debug=True)
application.listen(8888)
IOLoop.current().start()

APScheduler定時任務持久化

The so-called task persistence,That is, tasks are stored in persistent containers such as files or databases,如果APSchedulerThe scheduled task service process is interrupted,Unexecuted tasks remain,when the service starts again,Scheduled tasks can be read from the database and loaded again,這裡以redis數據庫為例子:

from apscheduler.jobstores.redis import RedisJobStore
# 初始化
def init_scheduler():
global scheduler
jobstores = {
'default': RedisJobStore(jobs_key='cron.jobs',run_times_key='cron.run_times',
host='localhost', port=6379,)
}
scheduler = TornadoScheduler(jobstores=jobstores)
scheduler.start()
scheduler.add_job(task,"interval",seconds=3,id="job1",args=())
print("定時任務啟動")

這裡通過jobstores參數將redisLoaded into the scheduled task service,當創建任務時,will be in the databasehashform to store task details:

127.0.0.1:6379> keys *
1) "cron.run_times"
2) "cron.jobs"
127.0.0.1:6379> type cron.jobs
hash
127.0.0.1:6379> hgetall cron.jobs
1) "job1"
2) "\x80\x05\x95\x14\x02\x00\x00\x00\x00\x00\x00}\x94(\x8c\aversion\x94K\x01\x8c\x02id\x94\x8c\x04job1\x94\x8c\x04func\x94\x8c\x0e__main__:task1\x94\x8c\atrigger\x94\x8c\x1dapscheduler.triggers.interval\x94\x8c\x0fIntervalTrigger\x94\x93\x94)\x81\x94}\x94(h\x01K\x02\x8c\btimezone\x94\x8c\x1bpytz_deprecation_shim._impl\x94\x8c\twrap_zone\x94\x93\x94\x8c\bbuiltins\x94\x8c\agetattr\x94\x93\x94\x8c\bzoneinfo\x94\x8c\bZoneInfo\x94\x93\x94\x8c\t_unpickle\x94\x86\x94R\x94\x8c\x0cAsia/Irkutsk\x94K\x01\x86\x94R\x94h\x19\x86\x94R\x94\x8c\nstart_date\x94\x8c\bdatetime\x94\x8c\bdatetime\x94\x93\x94C\n\a\xe6\a\x1c\x16\x1e&\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94\x8c\bend_date\x94N\x8c\binterval\x94h\x1f\x8c\ttimedelta\x94\x93\x94K\x00K\x03K\x00\x87\x94R\x94\x8c\x06jitter\x94Nub\x8c\bexecutor\x94\x8c\adefault\x94\x8c\x04args\x94)\x8c\x06kwargs\x94}\x94\x8c\x04name\x94\x8c\x05task1\x94\x8c\x12misfire_grace_time\x94K\x01\x8c\bcoalesce\x94\x88\x8c\rmax_instances\x94K\x01\x8c\rnext_run_time\x94h!C\n\a\xe6\a\x1c\x16\x1e,\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94u."

And if you delete the task,redisTasks in the database are also deleted synchronously.

至此,APScheduler配合TornadoA simple concurrent asynchronous timed task service is completed.

Celery

celery是一款在PythonTimed task field“開風氣之先”的框架,和APScheduler相比,celeryA little bloated,同時,celeryIt does not have any function of task persistence,Three-party containers are also required for support.

首先安裝5.0以上版本:

pip3 install celery==5.2.7

隨後,初始化任務對象:

from celery import Celery
from datetime import timedelta
from redisbeat.scheduler import RedisScheduler
app = Celery("tornado")
app.conf["imports"] = ["celery_task"]
# 定義broker
app.conf.broker_url = "redis://localhost:6379"
# 任務結果
app.conf.result_backend = "redis://localhost:6379"
# 時區
app.conf.timezone = "Asia/Shanghai"

This task agent(broker)和任務結果(result_backend)are also stored in redis中.

Immediately after that, declare the asynchronous task method:

from celery import shared_task
import asyncio
async def consume():
return 'test'
@shared_task
def async_job():
return asyncio.run(consume())

這裡通過asyncioThe library calls asynchronous methods indirectly.

Then add the configuration of the scheduled task:

from datetime import timedelta
# 需要執行任務的配置
app.conf.beat_schedule = {
"task1": {
"task": "celery_task.async_consume", #執行的方法
"schedule": timedelta(seconds=3),
"args":()
},
}

隨後啟動worker服務:

celery -A module_name worker --pool=solo -l info

接著啟動beat服務:

celery -A module_name beat -l info

Asynchronous timed tasks will be loaded and executed,系統返回:

C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info
-------------- [email protected] v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 22:55:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tornado:0x23769b40430
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_task.async_job
. celery_task.job
. test_celery.sub
[2022-07-28 22:55:02,234: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-07-28 22:55:04,267: INFO/MainProcess] mingle: searching for neighbors
[2022-07-28 22:55:11,552: INFO/MainProcess] mingle: all alone
[2022-07-28 22:55:21,837: INFO/MainProcess] [email protected] ready.
[2022-07-28 22:58:26,032: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] received
[2022-07-28 22:58:28,086: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] succeeded in 2.062999999994645s: 'test'
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] received
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] succeeded in 0.0s: 'test'
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] received
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] succeeded in 0.0s: 'test'

同時,在redisTask details and results are stored in the database as lists and strings:

127.0.0.1:6379> keys *
1) "celery-task-meta-f4aa4304-02c3-48ee-8625-fa1fe27b8e98"
2) "celery-task-meta-bb33981d-0629-4173-8375-128ba84d1f0f"
3) "_kombu.binding.celery"
4) "celery-task-meta-b0337808-c90b-450b-98bc-fd577f7039d0"
5) "cron.run_times"
6) "cron.jobs"
7) "celery"

From the scheduling level,celery和APSchedulerNot much difference,But from the cost of use,celery比APSchedulerMaintain one more service,worker和beatThe form of dual services also increases the overhead of system monitoring resources.

Dynamic maintenance of asynchronous timing tasks

From a task management perspective,celeryThere is no doubt that it was a complete loss,因為原生celeryDynamically modifying scheduled tasks is not supported at all.But we can save the country through the curve of the tripartite library:

pip3 install redisbeat

這裡通過redisreplaced by the scheduled task servicecelery原生的beat服務.

建立redisbeat實例:

from celery import Celery
from datetime import timedelta
from redisbeat.scheduler import RedisScheduler
app = Celery("tornado")
app.conf["imports"] = ["celery_task"]
# 定義broker
app.conf.broker_url = "redis://localhost:6379"
# 任務結果
app.conf.result_backend = "redis://localhost:6379"
# 時區
app.conf.timezone = "Asia/Shanghai"
@app.task
def sub():
return "test"
schduler = RedisScheduler(app=app)
schduler.add(**{
'name': 'job1',
'task': 'test_celery.sub',
'schedule': timedelta(seconds=3),
'args': ()
})

通過schduler.addmethod to dynamically add timed tasks,隨後以redisbeat的形式啟動celery服務:

celery -A test_celery beat -S redisbeat.RedisScheduler -l INFO

At this time, the transformed system accepts the dynamic task call and executes it:

C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info
-------------- [email protected] v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 23:09:50
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tornado:0x19c1a1f0040
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_task.async_job
. celery_task.job
. test_celery.sub
[2022-07-28 23:09:52,916: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-07-28 23:09:54,971: INFO/MainProcess] mingle: searching for neighbors
[2022-07-28 23:10:02,140: INFO/MainProcess] mingle: all alone
[2022-07-28 23:10:12,427: INFO/MainProcess] [email protected] ready.
[2022-07-28 23:10:12,440: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] received
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] succeeded in 2.0780000000013388s: 'test'
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] received
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] succeeded in 0.0s: 'test'
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] received
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] succeeded in 0.0s: 'test'
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] received
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] succeeded in 0.0s: 'test'

響應的,也可以通過remove方法和任務id進行刪除操作:

schduler.remove('job1')

The task details are stored in the form,It has also been upgraded from a list to an ordered set,提高了效率:

127.0.0.1:6379> type celery:beat:order_tasks
zset
127.0.0.1:6379> zrange celery:beat:order_tasks 0 -1
1) "{\"py/reduce\": [{\"py/type\": \"celery.beat.ScheduleEntry\"}, {\"py/tuple\": [\"job1\", \"test_celery.sub\", {\"__reduce__\": [{\"py/type\": \"datetime.datetime\"}, [\"B+YHHBcMDgfyGg==\", {\"py/reduce\": [{\"py/function\": \"pytz._p\"}, {\"py/tuple\": [\"Asia/Shanghai\", 28800, 0, \"CST\"]}]}]], \"py/object\": \"datetime.datetime\"}, 43, {\"py/reduce\": [{\"py/type\": \"celery.schedules.schedule\"}, {\"py/tuple\": [{\"py/reduce\": [{\"py/type\": \"datetime.timedelta\"}, {\"py/tuple\": [0, 3, 0]}]}, false, null]}]}, {\"py/tuple\": []}, {}, {}]}]}"

至此,celery配合tornadoCreating an asynchronous timed task is complete.

結語

APSchedulerStronger than agile and can be attached toTornadoin the event loop system,CeleryIt is adept at scheduling and distributed support and is relatively independent,The two are indistinguishable,各擅勝場,Suitable for different business application scenarios,當然,There are also many aspects that need to be improved in the processing strategy when the asynchronous scheduled task is executed abnormally,比如由於實例夯死導致的過時觸發問題、任務追趕和任務堆積問題、工作流場景下任務異常後是整體重試還是斷點續傳重試等,Specific problems are required to be analyzed in detail.

原文轉載自「劉悅的技術博客」 https://v3u.cn/a_id_220


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved