我有一些操作数据库的celery任务,需要在代码开头新建session,在代码结尾关闭session。
当前的方式是:
@app.task
def celery_task():
session = DB_Session()
try:
foo
except:
bar
finally:
session.close()
但问题是,这样对错误处理很不友好。而且一整个任务里都用try...except...finally
感觉很难看。
最近看Celery的官方文档,想到可以新建一个自定义的MyTask
,并在Task.after_return()
中关闭session,然后所有celery任务定义的时候都base=MyTask
。如下:
class MyTask(celery.task):
def after_return(self, *foo):
self.session.close()
@app.task(bind=True, base=MyTask)
def celery_task(self):
foo
我的问题是,如何才能在一个task创建的时候执行新建session的操作?我在celery的文档中没有找到一个方法是在task创建的时候执行的。
官方文档或源码中有没有相关实现不是很清楚,但是就算没有实现自己实现也是可以的。
相关实现大概率会通过decorator来做,所以自己写一个decorator就可以了