如何在Celery任务中“优雅”地管理数据库session?

我有一些操作数据库的celery任务,需要在代码开头新建session,在代码结尾关闭session。

当前的方式是:

@app.task
def celery_task():
    session = DB_Session()
    try:
        foo
    except:
        bar
    finally:
        session.close()

但问题是,这样对错误处理很不友好。而且一整个任务里都用try...except...finally感觉很难看。


最近看Celery的官方文档,想到可以新建一个自定义的MyTask,并在Task.after_return()中关闭session,然后所有celery任务定义的时候都base=MyTask。如下:

class MyTask(celery.task):
    def after_return(self, *foo):
        self.session.close()

@app.task(bind=True, base=MyTask)
def celery_task(self):
    foo

我的问题是,如何才能在一个task创建的时候执行新建session的操作?我在celery的文档中没有找到一个方法是在task创建的时候执行的。

阅读 4.3k
2 个回答

官方文档或源码中有没有相关实现不是很清楚,但是就算没有实现自己实现也是可以的。
相关实现大概率会通过decorator来做,所以自己写一个decorator就可以了

import functools

import flask

@bp.route('status', methods=['GET'])
def status():
  celery_task.delay()
  return flask.jsonify({'status': 'ok'})

def my_task(fn):
  @functools.wraps(fn)
  def func_wrapper(*args, **kwargs):
    try:
      session = DB_Session()
      res = fn('session', *args, **kwargs)
    except:
      pass
    finally:
      session.close()
      return res
  return func_wrapper


@app.task
@my_task
def celery_task(session):
  pass
新手上路,请多包涵

Task.on_bound 是一个类方法,不知道是否能够使用,它的注释如下这样:

Note:
            This class method can be defined to do additional actions when
            the task class is bound to an app.

我也正在做类似的事情:在celery任务中做数据库操作,也遇到类似的烦恼,我先占个坑,测试一下再来更新。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题