falsk celery task 数据库查询

在使用celery时,函数中修改数据库,commit后调用celery的task查询刚才添加的数据进行其他操作,总会出现找不到的情况,求解。

调用task部分代码

        ...
        try:
            db.session.commit()
        except Exception as e:
            current_app.logger.error(str(e))
            db.session.rollback()
            if not ci_existed:  # only add
                self.delete(ci.ci_id)
            return abort(500, "add CI error")
        his_manager = CIAttributeHistoryManger()
        his_manager.add(ci.ci_id, histories)
        ci_cache.apply_async([ci.ci_id], queue="async")
        # add bj ci
        add_ci_bj.apply_async([ci_type.type_name, None, ci.ci_id], queue="async")
        return ci.ci_id

celery的task函数

@celery.task(name="xxxxxxx", queue="async")
def add_ci_bj(ci_type, first_id, second_id):
    param, status = lib.ci.CIManager().get_relations(first_id, second_id, is_async=True)
    ...

task中调用的函数

def get_relations(self, first_id, second_id, is_async=False):
    start = time.clock()
    try:
        second = self.get_ci_by_id(second_id, need_children=False)
    except Exception as e:
        return None, "get ci by id error: first %s, second %s, e %s, is_async:%s" % \
               (first_id, second_id, e, is_async)
    ...

get_relation中的exception老是被触发,也就是查不到second_id对应的数据,有遇到过的么???求解

阅读 3.8k
1 个回答

这个问题确实是比较坑,养成好习惯,使用前先db.session.Close(),万一别人没Close呢!!!

  • flask-sqlalchemy会默认创建一个连接池,里面存放着一定数量的可用连接,并且flask-sqlalchemy会维护当前连接的状态。

  • db.session.Close()不是断开连接,而是将连接放回连接池,再次调用时会重新获取到新的最新状态的可用连接。

  • server-A调用db.session..时,假设获取到的连接为connection-A,通过该连接对数据库进行commit操作后,flask-sqlalchemy会记住当前连接状态,所以再次调用db.session.query...(仍然使用的是connection-A)查询刚插入的数据没有任何问题。

  • server-B运行时已经获得connection-B,当没有调用db.session.Close()就直接调用db.session.query.时,连接状态并没有更新,从而取不到server-A新插入的数据。

之前没有在多服务器使用过,所以没遇到,不Close也没什么问题。还有些东西也还没完全搞明白,得看flask-sqlalchemy的源码,为什么不同的状态获取到的数据不同?为什么不在commit之后主动刷新所有连接呢?

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进