比如我使用kafka或rabbitmq从消息队列中获取数据,来源只有一条队列,不能从数据源分流数据。而计算难度较高,所以要在内部进行分流。比如我的消息处理类为:
class Worker(threading.Thread):
def __init__(self):
self.raw = []
def run(self):
while True:
if self.raw:
d = self.raw.pop()
处理d数据
将结果保存到批量插入的类中
然后在程序运行时,我创建若干个Worker,然后将消息源传入的数据,分流保存到这若干个Worker的raw属性中。问题就在于,如何能开销比较低的分流这些数据?
还有这样多线程处理的思路是否正确?我之前测试过,感觉python自带的Queue的效率并不是特别高。
另外,假如我要设计弹性创建Worker,应该如何用python代码完成,就是当数据流较大时,我就追加创建一些Worker,当数据流降低时,就销毁一些Worker。
对 cpython 来说,耗 CPU 资源的运算,应该使用多进程而不是多线程。
在本例中,有两个地方可以做"分流",其一是如你所提的,在接收到数据后作分配,另一个是利用消息框架本身。
以 RabbitMQ 为例,你可以参考这个教程 Work Queues,https://www.rabbitmq.com/tuto...
其它诸如“找更省开销的方法”,“弹性工作池”等等,不妨放一下,先把功能实现了,再针对瓶颈做优化,以达到事半功倍的效果。
关于优化 python 性能,有一篇文章可以参考下 https://pypy.org/performance....