程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python asynchronous task processing framework -- celery

編輯:Python

About Celery

Celery It's a very simple one 、 flexible 、 Reliable distributed system , Can be used to handle a large number of messages , It also provides a set of tools for operating the system , meanwhile Celery Is a message queuing tool , It can be used to process real-time data and task scheduling .

Celery Keyword concept

  • Task

    Asynchronous tasks and scheduled tasks

  • Broker

    Represents an intermediary , It is responsible for receiving the tasks published by the producer and storing the tasks in the queue , Then the consumers waiting for the task are the following Worker To deal with it . however Celery It does not provide queue service itself , But configuration items are provided to implement , Usually by Redis or RabbitMQ Implement queue service .

  • Worker

    It literally means workers , It's actually the consumer who performs the task , It monitors message queues in real time , If there is a task, get the task and execute it .

  • Beat

    Timed task scheduler , Send the specified task to at the specified time according to the configuration timing related parameters Broker( A middleman ).

  • Backend

    Used to store the execution results of tasks . You can configure the redis perhaps database As backend

celery How to use

For example, the site registration needs to send an activation email to the user after the user registration is completed , It takes some time to send mail in the background , And you can't synchronously wait for the mail to be sent before responding to the page , This is a very bad user experience , At this time, we need an asynchronous framework (celery) To help us accomplish these tasks .

Based on the command line ( Not django In the environment )

  • Local test environment :Windows 10+Python 3.6+Celery 4.3.0+redis 2.4.5
    pip install redis The installation is convenient for the client to connect , There will also be a local redis Server side

  • adopt pycharm The initialization name is celerydemo Project

  • newly build tasks.py file , The contents are as follows :

    from celery import Celery
    # The first parameter "my_task" yes celery Instance application name 
    app=Celery("my_task",broker="redis://localhost:6379/0",backend="redis://localhost:6379/1")
    @app.task
    def send_mail():
    print(" Sending mail ****************************")
    return " Mail sent successfully "
    
  • newly build app.py file , The contents are as follows :

    from tasks import send_mail
    if __name__ == '__main__':
    result=send_mail.delay()
    print(result)
    
  • adopt pycharm open terminal, As shown in the figure below :

  • Carry out orders celery worker -A tasks -l info -P eventlet

    -A Indicates the module name of the current task , Here is the task.py The name of the file ;
    -l Express celery The log level of is as follows info、debug
    -P Express Pool implementation, Thread pool implementation class ?

  • Open a new one terminal Windows execute commands python app.py, The results are as follows :
    The red arrow in the above figure indicates the need for attention , You can see that the task has been executed , And the return value is also output

  • Re pass pycharm open python console Interface , Execute the following commands in turn

    >>>from tasks import send_mail
    >>>send_mail.name
    'tasks.send_mail'
    >>>send_mail.app
    <Celery tasks at 0x22353e56978>
    >>>result=send_mail.delay()
    result
    <AsyncResult: dccd1c2b-8737-4bf7-afd8-594887b52fe8>
    >>>result.ready()
    True
    >>>result.get()
    ' Mail sent successfully '
    

Use based on configuration

  • Create a new file named... Under the root directory celery_app Of python package

  • stay celery_app Create a new celeryconfig.py file , The contents are as follows :

    BROKER_URL = 'redis://127.0.0.1:6379/0'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    # Import the specified task module 
    CELERY_IMPORTS=['celery_app.task']
    

    For more configuration parameters, please refer to Celery Configuration parameters as well as celery Of configuration.

  • stay celery_app Of __init__.py initialization celery Application example , The contents are as follows :

    from celery import Celery
    app=Celery("demo")
    # You can load configuration from the configuration object .
    app.config_from_object("celery_app.celeryconfig")
    
  • stay celery_app New under the directory task.py file , The contents are as follows :

    from celery_app import app
    @app.task
    def send_mail():
    print(" Sending mail ****************************")
    return " Mail sent successfully "
    
  • open terminal The window executes the following command under the root directory :
    celery worker -A celery_app -l info -P eventlet, The results are as follows :
    You can see that the contents of the configuration file are effective and celery It also identifies the tasks created

  • Open a new one terminal Windows execute commands python app.py, The results are as follows :

  • And on again python console The interface operation is as follows :

    >>> from celery_app.task import send_mail
    >>>send_mail.name
    'celery_app.task.send_mail'
    >>> result=send_mail.delay()
    >>> result.ready()
    True
    >>> result.get()
    ' Mail sent successfully '
    

Timed task use

  • take celeryconfig.py The contents are modified as follows :
    from celery.schedules import crontab,timedelta
    BROKER_URL = 'redis://127.0.0.1:6379/0'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    # Import the specified task module 
    CELERY_IMPORTS=['celery_app.task']
    # Timing task 
    CELERYBEAT_SCHEDULE={
    
    'task1':{
    
    'task':'celery_app.task.send_mail',# The path to the function 
    'schedule':timedelta(seconds=10),# Every time 10 Seconds to send mail 
    }
    }
    
  • open terminal Carry out orders celery beat -A celery_app -l info, give the result as follows :
    * Open another one terminal Interface , Execute the following command :
    celery worker -A celery_app -l info -P eventlet
    Because of the setting of the second level , So once executed beat The command will immediately send the scheduled task and then wait 0 Repeat in seconds

Reference material

  • celery-3.1.7 Chinese document ( This document details the usage of command line parameters )
  • celery Official documents
  • celery-4.3.0 Chinese document

Problems encountered and Solutions

  • not enough values to unpack (expected 3, got 0)
    win10 Better (https://www.oneisall.top) function celery4.x That's the problem , Need to pass through pip install eventlet, At the same time starting worker Add a parameter , Such as :celery -A <mymodule> worker -l info -P eventlet that will do

  • stay [tasks] You can see the task , But by calling result.get() Prompt that the task is not registered
    The reason for this may be that the import methods you perform twice are different , As a result, the names of the automatically generated tasks are different , The simple way is to give the task Explicitly add task name Please refer to this article Celery-4.1 User guide : Task describe

  • RROR/MainProcess] consumer: Cannot connect to redis
    The connection failed redis, You can try to localhost Change to 127.0.0.1

  • Unable to receive task , Execute not execute , however celery In a ready state
    Whether the view is added when invoking the task delay() Method

  • newspaper Object of type ‘byte’ is not JSON serializable
    Can be in django Of settings.py On the inside celery Add the following content to the configuration parameters of :

    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'
    CELERY_ACCEPT_CONTENT = ['pickle', 'json']
    

    add to pickle, In this case, if the task function contains object parameters , The problem of object serialization will not be reported during task execution , About pickle Please refer to pickle Module details .
    Welcome to my blog [ No rain and clear sky blog ]


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved