kafka怎么做消息缓存?

我是kafka小白
大家见谅~~~

我想做的是 kafka的接受端producer 1条1条消息接受
然后做一个缓存 等缓存到一定数量 比如 100条
再一次性发送给发送端consumer

改怎么写?

阅读 1.7k
2 个回答
  1. 提问中两处错误:生产者producer,而不是接受端producer;消费者consumer,而不是发送端consumer
  2. 在发送时,缓存到一定数量再一次性发送;这个需求不是 生产者配置属性“batch.size”的作用吗?

你可以使用Kafka的批处理功能来实现你的需求。下面是用PHP写的示例代码供参考:

<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost');

$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic('test');

$cached_messages = [];

while (true) {
    // 从Kafka获取一条消息
    $message = $consumer->consume(1000);
    if ($message === null) {
        continue;
    }

    // 处理消息
    $cached_messages[] = $message->payload;

    // 如果缓存消息的数量达到100,则批量发送给Kafka
    if (count($cached_messages) >= 100) {
        $batch = RdKafka\Producer::MSG_BATCH;
        $topic->produce($batch, 0, implode(PHP_EOL, $cached_messages));
        $cached_messages = [];
    }
}

$producer->flush(10000);

上面的代码中,首先创建了一个生产者对象和一个主题对象。然后,程序会循环从Kafka获取一条消息,将消息添加到缓存中。当缓存消息的数量达到100时,程序会将缓存的消息批量发送给Kafka。

需要注意的是,如果你在缓存消息时设置了时间限制,比如5秒钟,那么你需要在循环结束时将剩余的消息发送给Kafka。

至于如何保证发送的消息顺序和正确性,这取决于你在发送和接收消息时所使用的配置和代码实现。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题