在谷歌搜索:
rabbitmq
生产者批量插入消息kafka
生产者批量插入消息
出来的词条都是不相关的,是它俩都不支持吗?
可是我觉得这是一个很常用的需求呀!
在谷歌搜索:
rabbitmq
生产者批量插入消息kafka
生产者批量插入消息出来的词条都是不相关的,是它俩都不支持吗?
可是我觉得这是一个很常用的需求呀!
以 Python 的 rabbitpy 库为例子测试
在同一个连接中连续 publish 3w 次:
import rabbitpy
import time
from loguru import logger
from uuid import uuid4
start_at = time.time()
# Connect to RabbitMQ on localhost, port 5672 as guest/guest
with rabbitpy.Connection('amqp://guest:guest@192.168.31.203:5672/') as conn:
# Open the channel to communicate with RabbitMQ
with conn.channel() as channel:
# Start the transaction
# tx = rabbitpy.Tx(channel)
# tx.select()
# Create the message to publish & publish it
for i in range(30000):
message = rabbitpy.Message(channel, f'message body value {uuid4().hex}')
message.publish('ponponon', 'seckill.*')
# Rollback the transaction
# tx.rollback()
end_at = time.time()
logger.debug(f'耗时 {end_at-start_at}s')
输出:
2022-03-31 21:42:24.119 | DEBUG | __main__:<module>:27 - 耗时 3.218639850616455s
RPS 很高,有 10000 的样子。
再来看看反面例子:
每次 publish 都创建一个新的连接:
import rabbitpy
import time
from loguru import logger
from uuid import uuid4
start_at = time.time()
for _ in range(10):
# Connect to RabbitMQ on localhost, port 5672 as guest/guest
with rabbitpy.Connection('amqp://guest:guest@192.168.31.203:5672/') as conn:
# Open the channel to communicate with RabbitMQ
with conn.channel() as channel:
# Start the transaction
# tx = rabbitpy.Tx(channel)
# tx.select()
# Create the message to publish & publish it
message = rabbitpy.Message(channel, f'message body value {uuid4().hex}')
message.publish('ponponon', 'seckill.*')
# Rollback the transaction
# tx.rollback()
end_at = time.time()
logger.debug(f'耗时 {end_at-start_at}s')
输出:
2022-03-31 21:45:47.328 | DEBUG | __main__:<module>:27 - 耗时 4.212406158447266s
可以看到,效率很低很低,有多低呢?
RPS 只有 2 的样子
所以,rabbitpy
不需要批量提交的 api,因为 rabbitmq
用是基于 TCP 的 AMQP 协议,已经很高效了!
1 回答1.3k 阅读✓ 已解决
2 回答2k 阅读
这里以PHP描述为例;demo里面
单个发送:$channel->basic_publish();
多个发送:$channel->batch_basic_publish();
其他语言不太清楚,应该也是可以的