1. Celery 入门教程

[success] 介绍说明

  • Celery 是一个分布式任务队列管理工具,使用它可以很好的完成一些无须及时响应的后台耗时操作。
  • 官方文档地址
  • 文中代码目录结构:
    ├── app
    │   ├── celery_task.py
    │   ├── common.py
    │   ├── __init__.py
    │   └── runner.py
    ├── celeryconf.py
    └── tasks
    │   ├── celery_worker.py     
    │   ├── __init__.py
    

1.1. 安装

1.1.1. 安装环境

本文的系统环境为: Ubuntu 16.04

Python 环境为: Python 3.6.5

1.1.2. RabbitMQ 安装和简单配置

RabbitMQ安装官方文档

  • 步骤1 签名秘钥

    将签署 RabbitMQ 版本的秘钥添加到apt-key

    $ apt-key adv --keyserver “hkps.pool.sks-keyservers.net” --recv-keys “06B73A36E6026DFCA”
    

    下载和导入秘钥

    $ wget -O  - “https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc” | sudo apt-key add -
    
  • 步骤2 选择版本库

    bionic for Ubuntu 18.04
    xenial for Ubuntu 16.04
    stretch for Debian Stretch
    jessie for Debian Jessie
    

    文章环境是基于Ubuntu 16.04,所以命令为:

    $ deb http://dl.bintray.com/rabbitmq-erlang/debian xenial erlang
    
  • 步骤3 安装Erlang和RabbitMQ 运行命令

    $ sudo apt-get update
    $ sudo apt-get install erlang (或erlang-nox)
    $ sudo apt-get install rabbitmq-server
    

1.1.3. RabbitMQ简单使用

  1. 开启页面管理插件

    $ rabbitmq-plugins enable rabbitmq_management
    
  2. 配置 vhost 以供 Celery 使用

    • 浏览器打开 http://localhost:15672
    • 默认账户为guest 密码guest
    • 进入用户管理界面
    • 新增用户 soc,密码自拟,tags填 administrator
    • 例如 用户名:soc 密码:soc_password
    • 进入vhost管理界面
    • 新增 vhost 虚拟机如 learn,此时 learn虚拟机是没有被分配用户的。
    • 在 vhosts 虚拟机列表中选中新建的 learn 虚拟主机进入详情页,在 Permissions 栏中选择 User 为SOC,再确认即可。

1.2. 配置

配置-官方文档地址

最简单的配置文件:

  # app/tasks/celery_worker.py
  from celery import Celery
  learn = Celery(__name__, broker="amqp://soc@soc_password@127.0.0.1/learn", backend="redis://localhost:6379:3")
  # broker 任务队列(必须的)    backend 结果队列 (非必须的)

1.2.1. 配置导入方法

  • 方法1: 从其他配置文件导入Celery配置(最常用)
    其他配置文件celeryconf.py

      # celeryconf.py
      CELERY_BROKER_URL = "amqp://soc:soc_password@127.0.0.1/learn"  # 消息中间件,用来调度任务队列
      CELERY_BACKEND_URL = "redis://localhost:6379/3"  # 任务处理后用于保存结果
      CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
    

    Celery配置文件celery_worker.py

      # app/tasks/celery_worker.py
      from celery import Celery
      from .. import celeryconf
    
      learn = Celery(__name__, broker=celeryconf.CELERY_BROKER_URL, backend=celeryconf.CELERY_BACKEND_URL)
      learn.config_from_object("celeryconf")  # 从配置文件导入
    
  • 方法2: update导入
    适用情况: 存在多个Celery应用,需要定制化导入时。
    Celery配置文件celery_worker.py

      # app/tasks/celery_worker.py   
          learn.conf.update(
            task_serializer='json',
            accept_content=['json'],  # 新式导入方法,用小写
            result_serializer='json',
            )
    
  • 方法3: 直接导入
    Celery配置文件celery_worker.py
        # app/tasks/celery_worker.py   
        learn.conf.CELERY_TASK_SERIALIZER = 'json'
    

1.3. 任务

任务-官方文档地址

任务定义了被调用时发生的动作,使用task()装饰器可以轻松从任何地方创建可调用的Celery任务。

1.3.1. 任务导入

[warning] 特别注意: 想要任务执行,第一步就是在Celery配置中添加任务文件所在路径。

任务导入的配置方法:
Celery配置文件celery_worker.py

# app/tasks/celery_worker.py
...
learn.conf.CELERY_IMPORTS = (  # 老版本方法,但有时Celery启动时会提示使用新方法。
   "app.celery_task"
)

learn.conf.imports = (  # 新方法
   "app.celery_task"
)

1.3.2. 基本任务

最基础的,也是最常用的Celery任务的创建。
Celery任务文件celery_task.py

# app/celery_task.py
from tasks.celery_worker import learn  # learn是之前定义的Celery任务实例

@learn.task
def task1(x, y):
  return x + y

1.3.3. 装饰器版

[warning] 特别注意: 当多个装饰器和任务装饰器结合使用的时候,必须保证最后使用任务装饰器

Celery任务文件celery_task.py

# app/celery_task.py
from tasks.celery_worker import learn

from app.common import get_run_time

...
@learn.task
@get_run_time
def task2(num):
    return num*10

其他装饰器文件common.py

# app/common.py
import time

def get_run_time(func):
    def wrapper(*args, **kwargs):
        s = time.time()
        result = func(*args, **kwargs)
        print(f"==耗时: {time.time() - s}==")
        return result
    return wrapper

1.3.4. 修改任务名

有时候因为文件所在路径问题,可能导致任务的名字可能很长,缀有无用的信息如:

Task app.celery_task.task3 (此为默认名字)

这时可以通过显式声明来自定义任务的名称,示例如下:
Celery任务文件celery_task.py

# app/celery_task.py
@learn.task(name="learn3")
def task3():              
    return datetime.now()

[warning] 特别注意: 自动名称生成不能和相对导入很好的结合。

推荐的导入方式:

from project.module import foo  # GOOD
from .module import foo  # GOOD

糟糕的导入方式:

from module import foo  # BAD

1.3.5. 其他的

其他的配置如:

  1. 是否存储任务结果 @learn.task(ignore_result=False)
  2. 是否为任务执行后确认而不是执行前确认(默认为执行前确认)@learn.task(acks_late=Flase)。 Celery的任务,默认开始执行就从消息队列里消除,不关注是否正确执行得到结果, 设置acks_late可以重置为当任务执行后才从消息队列中删除,如果任务逻辑是异常的可能导致无限重复执行该错误代码。
  3. 任务重试
  4. 获取请求信息 如需获取任务的详情,可以通过app.task.request来获取。官方文档说明

1.4. 调用

调用-官方文档地址

Celery任务调用主要有两种方法:

  1. apply_async(args, *kwargs) 发送任务消息,支持多种选项

  2. delay(args, *kwargs) 发送任务信息的快捷方式,但不支持执行选项

1.4.1. 单独调用

Celery任务调用文件runner.py

# app/runner.py
from app.celery_task import *

if __name__ == "__main__":
t1 = task1.delay(2, 5)

1.4.2. 批量调用

Celery任务调用文件runner.py

# app/runner.py
from celery.result import GroupResult
from celery import group
from app.celery_task import *

if __name__ == "__main__":

    tasks = group(task4.s(i) for i in range(1, 10))()
    res = [task.get() for task in tasks.results]
    print(res)

1.5. 结果

结果-官方文档地址

1.5.1. 在非Celery任务中调用Celery任务结果

Celery任务调用文件runner.py

# app/runner.py
from app.celery_task import *

if __name__ == "__main__":
    info = task1.delay(1,2)
    print(info.get())

1.5.2. 在Celery任务中调用其他的Celery任务结果

Celery任务调用文件runner.py

# learn/celery_task.py

from celery.result import allow_join_result
...
@learn.task(trail=True)
def task5(num):
    task = task4.delay(num)
    with allow_join_result():
        result = task.get()
    return result

1.5.3. celery任务中批量获取其他celery任务的结果

Celery任务调用文件runner.py

# learn/celery_task.py
from celery.result import allow_join_result
from celery import group
@learn.task
def task6(num):
    result = []
    tasks = group(task4.s(i) for i in range(num)).apply_async()
    for task in tasks:
        with allow_join_result():
            result.append(task.get())
    return result

1.6. 高级使用方法

高级使用方法-官方文档地址

1.6.1. group: 接受一个任务列表

[warning] 特别注意: group调用有时会引发奇怪的Celery的错误,其引发原因暂不确定,可搜到的资料链接

Celery引发的错误大致如下:

Traceback (most recent call last):
  File "/home/mrq/.pyenv/versions/3.6.5/envs/socintel/lib/python3.6/site-packages/celery/backends/async.py", line 154, in add_pending_result
    self._maybe_resolve_from_buffer(result)
  File "/home/mrq/.pyenv/versions/3.6.5/envs/socintel/lib/python3.6/site-packages/celery/backends/async.py", line 160, in _maybe_resolve_from_buffer
    result._maybe_set_cache(self._pending_messages.take(result.id))
  File "/home/mrq/.pyenv/versions/3.6.5/envs/socintel/lib/python3.6/site-packages/celery/utils/collections.py", line 869, in take
    raise self.Empty()
queue.Empty
...
Traceback (most recent call last):
  File "/home/mrq/.pyenv/versions/3.6.5/envs/socintel/lib/python3.6/site-packages/celery/app/trace.py", line 382, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/mrq/.pyenv/versions/3.6.5/envs/socintel/lib/python3.6/site-packages/celery/app/trace.py", line 641, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/mrq/note/socintel-server/app/tasks/schedule.py", line 39, in domain_schedule
    crawl_domains = dispatch_crawl_domain()
  File "/home/mrq/note/socintel-server/app/tasks/domain_processing.py", line 25, in dispatch_crawl_domain
    tasks = group(crawl_domain.s(domain_source) for domain_source in domain_crawl_dict.keys()).apply_async()
...
  File "/home/mrq/.pyenv/versions/3.6.5/envs/socintel/lib/python3.6/site-packages/redis/_compat.py", line 135, in nativestr
    return x if isinstance(x, str) else x.decode('utf-8', 'replace')
AttributeError: 'list' object has no attribute 'decode'

引发错误的程序语句为:

tasks = group(crawl_domain.s(domain_source) for domain_source in domain_crawl_dict.keys()).apply_async()

抛出的错误为:

AttributeError: 'list' object has no attribute 'decode'

大致猜测是group(...).apply_async()这种调用方式导致的。

根据官方文档相关内容,将引发错误的语句改为如下后,程序运行恢复正常。

tasks = group([crawl_domain.s(domain_source) for domain_source in domain_crawl_dict.keys()]).apply_async()

根据官方文档相关内容中的范例,Celery是支持引发错误的语句的原结构的,大多数情况下不会报错,但偶尔因不明确的原因可能触发上述异常。

所以使用group时,如果遇到该错误请尝试将语句结构改为:

group([celery_func.s(param) for param in params])

1.6.2. chain: 将任务以铰链的方式一环套一环,顺序执行

Celery任务调用文件runner.py

# app/runner.py
from celery import chain
...
res1 = chain(task1.s(2,3), task2.s())()
res2 = (task1.s(2,3) | task2.s())()

print(res1.get())
print(res1.parent.get()) # 获取上一个任务结果
print(res2.get())

1.6.3. chord: 组中任务执行完才执行下一个任务

Celery任务文件celery_task.py

@learn.task
def task7(alist):
    str_list = []
    int_list = []
    special_list = []
    for i in alist:
        if isinstance(i, str):
            str_list.append(i)
        elif isinstance(i, int):
            int_list.append(i)
        else:
            special_list.append(i)
    return {
        "str": len(str_list),
        "int": len(int_list),
        "special": len(special_list)
    }

Celery任务调用文件runner.py

# app/runner.py
from celery import chord
...
res1 = chord(task1.s(i, i) for i in range(10))(task7.s())
print(res1.get())

1.7. 启动

在项目根目录下执行命令:

celery -A tasks.celery_worker.learn worker

可选参数 :

  -l info 日志级别
  -c 10   并发数量
  -P gevent 并发方式(默认为线程, 也支持协程启动,如果是要协程启动需要预先安装包)
  -n learn 该celery消息队列别名
  -Q learn 指定消息队列,需在@learn.task(queue="learn")中指定

1.8. 定时任务

定时任务的配置方法:
Celery配置文件celery_worker.py

# app/tasks/celery_worker.py
from celery.schedules import crontab

...
learn.conf.CELERYBEAT_SCHEDULE = {
  "runner1": {
      "task": "learn3",  # 任务名称
      "schedule": crontab(minute=1, hour=5),  # 每天的五点一分执行
  },
  "runner2": {
      "task": "app.celery_task.task1",
      "schedule": crontab(minute="*/1"),  # 每分钟执行一次
      "args": (randint(0,100), randint(100,1000))  # 传递的参数
  }
}

定时任务启动命令需要加 -B ,示例如下:

$ celery -B -A tasks.celery_worker.learn worker --loglevel=info -n learn

results matching ""

    No results matching ""

    results matching ""

      No results matching ""