Celery接入方法

wware中可以使用  Celery(分布式任务队列) . 但需要手动完成一些初始化步骤和手写Python代码. 

Celery初始化

    1. 更新Server 

    2. ssh 登录到服务器中安装Celery:  bash /home/httpd/wware/tools/update/celery.sh

    3. 拷贝配置文件: cp /home/httpd/wware/start.ini /home/httpd/start.ini

    4. 在/home/httpd/start.ini 文件中增加如下配置:

[celery]
;必填. 1 表示启用celery. 非 1 表示不启用
enable = 1
;选填. 初始worker进城数量,可以不指定,默认为CPU核心数量
concurrency = 10
;选填. 任务的最大执行时间,int/float类型的数字,单位为秒
timelimit= 30
;选填. worker进程池自动伸缩,需要设置两个值 [最大,最小] 数量.例如 10,3
autoscale=10,3
;选填. 对worker进程的心跳检查间隔,int类型的数字,单位为秒.
heartbeatinterval=1
;选填. 每个工作进程在没有被终止或者被替换之前,能够执行的最大任务数量,可以解决内存泄露的问题. int类型的数字,单位为秒.默认是没有限制的.
maxtasksperchild=1000

    5. 启动Celery: bash /home/httpd/wware/tools/restartserver.sh

完成task脚本

规则: 在项目文件夹 lib 中新建 python 文件夹,在此文件夹中下的所有 * .py    文件都会被系统自动加载,如果符合celery的task,则会被自动注册为task.

举例说明: 假设网站域名为test.com

    1. test.com/lib/python/下新建 job.py 文件,文件内容拷贝如下模板

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from jobs import app,storage
'''
app: Celery 中 application 对象. 初始配置参数为 /home/httpd/start.ini 中所指定的.
  参考官方文档: http://docs.celeryproject.org/en/latest/userguide/application.html#application

storage: 为 wware 所提供的维护 wware 数据库主键与磁盘路径映射规则的 API
  参考官方文档: 

'''
# !!! 为保证能够正确执行,以上内容务必保留 !!!#

import os
import sys

# Elasticsearch Python API 参考: https://elasticsearch-py.readthedocs.io/en/master/
from elasticsearch import Elasticsearch
# 此处为示例演示,具体配置参考: https://elasticsearch-py.readthedocs.io/en/master/#sniffing
es = Elasticsearch()

@app.task()
def taskFunction(env,*data):
    '''
    参数:
      @env: 系统内置参数.Dictionary类型
        _domain:发起本任务调用的 domain

      @*args: 自定义参数,自行决定参数类型和数量
      
    返回值:
      None

    '''
  # 这里演示如何获取一条记录
  rootUser = es.get(index=env['_domain'], doc_type="user", id='25449deaa4531d3fa4531d3fa4531d3f')['_source']
  # 这里演示如何获取记录对应的磁盘路径
  icon = storage.getNodePath(env['_domain'],'user',"25449deaa4531d3fa4531d3fa4531d3f","public/icon.png")
  # 这里打印所有传入的参数
  print('task 参数: ',data)
  print(env['_domain'],' 用户信息: ',rootUser)
  print('用户头像地址: ',icon)  

    2. 新建一个后端视口(例如celery.json)中插入代码段 "celery" . " job名称" 固定值 填写 job.taskFunction; job所需参数   固定值 填写 hello celery. 然后发布此视口

    3.  wide主界面--服务器--打开终端--功能--查看WWARE日志

    4. 浏览器中访问 /celery.json 这个视口,可以从 "终端" 窗口中看到 如下内容表示执行正常:

....
[2018-09-12 11:06:27,092: WARNING/ForkPoolWorker-4] task 参数:
[2018-09-12 11:06:27,093: WARNING/ForkPoolWorker-4] ('hello celery',)
....
[2018-09-12 11:06:28,541: WARNING/ForkPoolWorker-4] test.com
[2018-09-12 11:06:28,541: WARNING/ForkPoolWorker-4] 用户信息:
[2018-09-12 11:06:28,541: WARNING/ForkPoolWorker-4] {'username': 'root', 'role': ['admin'], 'sessionid': ['xBGgm3bGTN3iWkrXS9KDO7vMKMGMP_O3'], 'password': 'a920cd6ec701952411d19c5611d19c56'}
....
[2018-09-12 11:06:28,542: WARNING/ForkPoolWorker-4] 用户头像地址:
[2018-09-12 11:06:28,542: WARNING/ForkPoolWorker-4] /home/httpd/data/db/test.com/user/25/44/9dea/a453/1d3f/a453/1d3f/a453/1d3f/public/icon.png

解释说明:

1. python文件定义了若干任务(仅是定义,等待被触发则会执行),每个任务有唯一的名称, 名称格式为:   文件名.函数名  

2. 每个任务至少有一个参数 env 此参数作为系统预留参数,会传入系统信息,比如域名等. 其余参数根据实际所需自行定义.

3. celery 代码段可以触发任务执行.并且可以传入参数.

 

storage 文档

storage对象所暴露的函数:

def key2id(key):
    '''
    任意字符串通过 murmurhash3 算法得到一个16进制表示的id

    参数:
      @key:  [string] 任意非空字符串
      
    返回值:      
      [string] 参数经过 murmurhash3 计算的出来的id

    异常:
      key must not empty:  当传入了空字符的时候抛出

    示例:
      storage.key2id('abc')
      
    '''

def isid(idstr):
    '''
    判断指定的字符串是否为一个合法的_id类型字符串.合法id类型格式为: 只能是数字 0-9 以及 字母a-f组成的长度为32的字符串

    参数:
      @key:  [string] 任意非空字符串
      
    返回值:      
      [boolean] True/False. True表示参数是一个合法的_id,False表示参数不是合法_id
    
    示例:
      storage.isid('abc')
      
    '''

def getNodePath (domain,table,nodeId,relpath):
    '''
    获取指定域名,表名,主键,相对路径,所对应的磁盘绝对路径,通过此路径可以访问对应的文件
    参数:
      @domain:  [string] 域名
      @table:   [string] 表名
      @nodeId:  [string] node的主键,可以是_id格式,如果不是_id格式的字符串会被转换为_id.
      @relpath: [string] 信息结构图中的 "存储位置". 相对于node对应磁盘9级目录的文件的相对路径. 

    返回值:      
      [string] 文件对应的磁盘绝对路径

    异常:
      Invalid domain:  当 domain 参数的值为空时抛出
      Invalid table:   当 table 参数的值为空时抛出
      Invalid id:      当 nodeId 参数的值为空时抛出

    示例:
      getNodePath('test.com','user',"75cdc6d1a2b006a5a2b006a5a2b006a5",'public/abc.png')
        
    '''

def getNodeUrl (domain,table,nodeId,relpath):
    '''
    获取指定域名,表名,主键,相对路径,所组成的文件url,通过此url可以访问磁盘文件.
    参数:
      @domain:  [string] 域名
      @table:   [string] 表名
      @nodeId:  [string] node的主键,可以是_id格式,如果不是_id格式的字符串会被转换为_id.
      @relpath: [string] 信息结构图中的 "存储位置". 相对于node对应磁盘9级目录的文件的相对路径. 

    返回值:      
      [string] 相对于当前域名的相对url地址

    异常:
      Invalid domain:  当 domain 参数的值为空时抛出
      Invalid table:   当 table 参数的值为空时抛出
      Invalid id:      当 nodeId 参数的值为空时抛出
    '''