InterfaceError:连接已经关闭(使用 django celery Scrapy)

新手上路,请多包涵

在 Celery 任务中使用 Scrapy 解析函数(有时可能需要 10 分钟)时,我得到了这个。

我使用: - Django==1.6.5 - django-celery==3.1.16 - celery==3.1.16 - psycopg2==2.5.5 (我也使用 psycopg2==2.5.4)

[2015-07-19 11:27:49,488: CRITICAL/MainProcess] Task myapp.parse_items[63fc40eb-c0d6-46f4-a64e-acce8301d29a] 内部错误: InterfaceError('connection already closed',)
追溯(最近一次通话):
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/app/trace.py”,第 284 行,在 trace_task 中
    uuid, retval, 成功, request=task_request,
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/backends/base.py”,第 248 行,在 store_result
    请求=请求,**kwargs)
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/backends/database.py”,第 29 行,在 _store_result 中
    traceback=traceback, children=self.current_task_children(请求),
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py”,第 42 行,在 _inner
    返回乐趣(*args,**kwargs)
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py”,第 181 行,在 store_result
    '元':{'孩子':孩子}})
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py”,第 87 行,在 update_or_create
    返回 get_queryset(self).update_or_create(**kwargs)
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py”,第 70 行,在 update_or_create
    obj, 已创建 = self.get_or_create(**kwargs)
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py”,第 376 行,在 get_or_create
    返回 self.get(**lookup), False
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py”,第 304 行,在 get
    num = len(克隆)
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py”,第 77 行,在 __len__
    self._fetch_all()
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py”,第 857 行,在 _fetch_all
    self._result_cache = list(self.iterator())
  迭代器中的文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py”,第 220 行
    对于 compiler.results_iter() 中的行:
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py”,第 713 行,在 results_iter
    对于 self.execute_sql(MULTI) 中的行:
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py”,第 785 行,在 execute_sql
    游标 = self.connection.cursor()
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py”,第 160 行,光标
    cursor = self.make_debug_cursor(self._cursor())
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py”,第 134 行,在 _cursor
    返回 self.create_cursor()
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/utils.py”,第 99 行,在 __exit__
    六.reraise(dj_exc_type, dj_exc_value, traceback)
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py”,第 134 行,在 _cursor
    返回 self.create_cursor()
  文件“/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py”,第 137 行,在 create_cursor
    游标 = self.connection.cursor()
InterfaceError:连接已经关闭

原文由 mou55 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 657
1 个回答

不幸的是,这是 django + psycopg2 + celery 组合的问题。这是一个古老而未解决的问题。

查看此线程以了解: https ://github.com/celery/django-celery/issues/121

基本上,当 celery 启动一个 worker 时,它会从 django.db 框架中分叉出一个数据库连接。如果此连接由于某种原因断开,它不会创建新连接。一旦无法使用 django.db 库检测何时断开数据库连接,Celery 就与此问题无关。当它发生时 Django 不会通知,因为它只是启动一个连接并接收一个 wsgi 调用(没有连接池)。我在拥有大量机器工作人员的大型生产环境中遇到了同样的问题,有时,这些机器失去了与 postgres 服务器的连接。

我解决了这个问题,将每个 celery master 进程置于一个 linux supervisord 处理程序和一个 watcher 下,并实现了一个处理 psycopg2.InterfaceError 的装饰器,当它发生时,这个函数会调度一个系统调用,以强制 supervisor 使用 SIGINT 优雅地重启 celery 进程。

编辑:

找到了更好的解决方案。我像这样实现了一个 celery 任务基类:

 from django.db import connection
import celery

class FaultTolerantTask(celery.Task):
    """ Implements after return hook to close the invalid connection.
    This way, django is forced to serve a new connection for the next
    task.
    """
    abstract = True

    def after_return(self, *args, **kwargs):
        connection.close()

@celery.task(base=FaultTolerantTask)
def my_task():
    # my database dependent code here

我相信它也会解决你的问题。

原文由 mannysz 发布,翻译遵循 CC BY-SA 3.0 许可协议

推荐问题