程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python 異步任務框架 Celery 入門,速看

編輯:Python

簡介

Celery 是使用 python 編寫的分布式任務調度框架。


它有幾個主要的概念:

celery 應用

用戶編寫的代碼腳本,用來定義要執行的任務,然後通過 broker 將任務發送到消息隊列中

broker

代理,通過消息隊列在客戶端和 worker 之間進行協調。
celery 本身並不包含消息隊列,它支持一下消息隊列RabbitMQRdisAmazon SQSZookeeper
更多關於 Broker 見官方文檔

backend

數據庫,用來存儲任務返回的結果。

worker

工人,用來執行 broker 分派的任務。

任務

任務,定義的需要執行的任務

版本要求

Celery5.1 要求:

python(3.6,3.7,3.8)
Celery 是一個資金最少的項目,所以我們不支持 Microsoft Windows。

更多更詳細的版本要求見官方文檔
安裝

使用 pip 安裝:

pip install -U Celery

捆綁包

Celery 還定義了一組包,用於安裝 Celery 和給定的依賴項。

可以在 pip 命令中實現中括號來指定這些依賴項。

pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"

具體支持的依賴包見官方文檔。

簡單使用

1. 選擇一個 broker

使用 celery 首先需要選擇一個消息隊列。安裝任意你熟悉的前面提到的 celery 支持的消息隊列。

2. 編寫一個 celery 應用

首先我們需要編寫一個 celery 應用,它用來創建任務和管理 wokers,它要能夠被其他的模塊導入。

創建一個tasks.py 文件:

from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y

第一個參數tasks是當前模塊的名稱,它可以省略,建議以當前模塊名為名稱。

第二個關鍵字參數 broker='redis://localhost:6379/0’指定我們使用 Redis 作為消息隊列,並指定連接地址。

3.運行 celery 的 worker 服務

cd 到 tasks.py 所在目錄,然後運行下面的命令來啟動 worker 服務

celery -A tasks worker --loglevel=INFO

4. 調用任務

 from tasks import add
add.delay(4,4)

通過調用任務的 delay 來執行對應的任務。celery 會把執行命令發送到 broker,broker 再將消息發送給 worker 服務來執行,如果一切正常你將會在 worker 服務的日志中看到接收任務和執行任務的日志。

5. 保存結果

如果你想要跟蹤任務的狀態以及保存任務的返回結果,celery 需要把它發送到某個地方。celery 提供多種結果後端。

我們這裡以 reids 為例,修改 tasks.py中的代碼,添加一個 Redis 後端。

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

更多結果後端見官方文檔。

重新啟動 worker 服務,重新打開 python 解釋器

 from tasks import add
result = add.delay(4,4)

ready()方法返回任務是否執行完成:

result.ready()
False

還可以等待結果完成,但很少使用這種方法,因為它將異步調用轉換為同步調用

 result.get(timeout=1)
8

在應用中使用 celery

創建項目

項目結構:

proj/__init__.py
/celery.py
/tasks.py

proj/celery.py

from celery import Celery
app = Celery('proj',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['proj.tasks']
)# 配置
app.conf.update(
result_expires=3600, # 結果過期時間
)

在這個模塊中我們創建了一個 Celery 模塊。要在你的項目中使用 celery 只需要導入此實例。

proj/tasks.py

from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.tas
kdef xsum(numbers)
return sum(numbers)

啟動 worker

celery -A proj worker -l INFO

調用任務

 from proj.tasks import add
add.delay(2, 2)

在 django 中使用celery

要在你的 django 項目中使用 celery,首先需要定義一個 Celery 的實例。

如果你又 django 項目如下:

- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py

那麼推薦的方法是創建一個新的proj/proj/celery.py模塊來定義芹菜實例:file:proj/proj/celery.py

import os
from celery import Celery
# 為`celery`設置默認的django設置模塊
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')
app = Celery('proj')
# 設置配置來源
app.config_from_object('django.conf:settings',namespace='CELERY')
# 加載所有的已注冊django應用中的任務
app.autodiscover_tasks()
@app.task(bind=True) def debug_task(self): print(f'Request: {
self.request!r}')

然後你需要在你的 proj/proj/init.py模塊中導入這個應用程序。這樣就可以保證 Django 啟動時加載應用程序,以便於 @shared_task 裝飾器的使用。

proj/proj/init.py:

from .celery import app as celery_app
__all__ = ('celery_app',)

請注意,此示例項目布局適用於較大的項目,對於簡單的項目,可以使用包含定義應用程序和任務的單個模塊。

接下來我們來解釋一下 celery.py 中的代碼,首先,我們設置celery命令行程序的環境變量DJANGO_SETTINGS_MODULE的默認值:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

這一行的作用是加載當前 django 項目的環境設置,特別是當需要在異步任務中用到 ORM。它必須在創建應用程序實例之前。

app = Celery('proj')

我們還添加了 Django 設置模塊作為 Celery 的配置源。這意味著我們不必使用多個配置文件,而是直接在 Django 的配置文件中配置 Celery。

app.config_from_object('django.conf:settings', namespace='CELERY')

大寫命名空間意味著所有Celery配置項必須以大寫指定,並以 CELERY_ 開頭,因此例如broker_url 設置變為 CELERY_BROKER_URL。

例如,Django 項目的配置文件可能包括:

settings.py

CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60

接下來,可重用應用程序的常見做法是在單獨的tasks.py模塊中定義所有任務Celery有一種方法可以自動發現這些模塊:

app.autodiscover_tasks()

使用上面的行,Celery 將按照tasks.py 約定自動從所有已安裝的應用程序中發現任務:

- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py

這樣就不必手動將各個模塊添加到CELERY_IMPORTS 設置中。

使用 @shared_task 裝飾器

我們編寫的任務可能會存在於可重用的應用程序中,而可重用的應用程序不能依賴與項目本身,因此無法直接導入 celery 應用實例。

@shared_task裝飾器可以讓我們無需任何具體的 celery 實例創建任務:demoapp/tasks.py

# Create your tasks here
from demoapp.models import Widget
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()

最後感謝每一個認真閱讀我文章的人,下面這個網盤鏈接也是我費了幾天時間整理的非常全面的,希望也能幫助到有需要的你!

這些資料,對於想轉行做【軟件測試】的朋友來說應該是最全面最完整的備戰倉庫,這個倉庫也陪伴我走過了最艱難的路程,希望也能幫助到你!凡事要趁早,特別是技術行業,一定要提升技術功底。希望對大家有所幫助……

如果你不想一個人野蠻生長,找不到系統的資料,問題得不到幫助,堅持幾天便放棄的感受的話,可以點擊下方小卡片加入我們群,大家可以一起討論交流,裡面會有各種軟件測試資料和技術交流。

點擊文末小卡片領取

敲字不易,如果此文章對你有幫助的話,點個贊收個藏來個關注,給作者一個鼓勵。也方便你下次能夠快速查找。

自學推薦B站視頻:

零基礎轉行軟件測試:25天從零基礎轉行到入職軟件測試崗,今天學完,明天就業。【包括功能/接口/自動化/python自動化測試/性能/測試開發】

自動化測試進階:2022B站首推超詳細python自動化軟件測試實戰教程,備戰金三銀四跳槽季,進階學完暴漲20K


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved