我是kafka小白
大家见谅~~~
我想做的是 kafka的接受端producer 1条1条消息接受
然后做一个缓存 等缓存到一定数量 比如 100条
再一次性发送给发送端consumer
改怎么写?
我是kafka小白
大家见谅~~~
我想做的是 kafka的接受端producer 1条1条消息接受
然后做一个缓存 等缓存到一定数量 比如 100条
再一次性发送给发送端consumer
改怎么写?
你可以使用Kafka的批处理功能来实现你的需求。下面是用PHP写的示例代码供参考:
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('test');
$cached_messages = [];
while (true) {
// 从Kafka获取一条消息
$message = $consumer->consume(1000);
if ($message === null) {
continue;
}
// 处理消息
$cached_messages[] = $message->payload;
// 如果缓存消息的数量达到100,则批量发送给Kafka
if (count($cached_messages) >= 100) {
$batch = RdKafka\Producer::MSG_BATCH;
$topic->produce($batch, 0, implode(PHP_EOL, $cached_messages));
$cached_messages = [];
}
}
$producer->flush(10000);
上面的代码中,首先创建了一个生产者对象和一个主题对象。然后,程序会循环从Kafka获取一条消息,将消息添加到缓存中。当缓存消息的数量达到100时,程序会将缓存的消息批量发送给Kafka。
需要注意的是,如果你在缓存消息时设置了时间限制,比如5秒钟,那么你需要在循环结束时将剩余的消息发送给Kafka。
至于如何保证发送的消息顺序和正确性,这取决于你在发送和接收消息时所使用的配置和代码实现。
3 回答3.4k 阅读✓ 已解决
1 回答4.5k 阅读
2 回答1.3k 阅读
1 回答2.2k 阅读
1 回答1.7k 阅读
2 回答2k 阅读
1 回答509 阅读✓ 已解决