如何通过Java在Kafka中创建Topic

新手上路,请多包涵

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且通过 java api 推送消息,它工作正常。但是我想通过java api创建一个主题。经过长时间的搜索,我找到了下面的代码,

 ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我试过上面的代码,它显示主题已创建,但我无法在主题中推送消息。我的代码有什么问题吗?或任何其他方式来实现上述目标?

原文由 Jaya Ananthram 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 839
2 个回答

编辑- 较新版本的 Kafka 不需要 Zookeeper。请参阅 @Neeleshkumar Srinivasan Mannur 对 API 版本 0.11.0+ 的回答



原答案

我修好了..经过长时间的研究..

 ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

从上面的代码来看,ZkClient 会创建一个topic,但是这个topic 的信息不会被kafka 感知到。所以我们要做的是,我们需要通过以下方式为 ZkClient 创建对象,

首先导入下面的语句,

 import kafka.utils.ZKStringSerializer$;

并通过以下方式为 ZkClient 创建对象,

 ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());


编辑 1:(@ajkret 评论)

上述代码不适用于 kafka > 0.9,因为 api 已更改,请使用以下代码用于 kafka > 0.9


 import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}

原文由 Jaya Ananthram 发布,翻译遵循 CC BY-SA 4.0 许可协议

这个过程在 API 0.11.0+ 中似乎大大简化了。使用它,可以按如下方式完成

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)

List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);

adminClient.createTopics(newTopics);
adminClient.close();

kafka.properties 文件内容如下

bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

请注意,必须关闭 AdminClient 的实例才能反映新创建的主题。

原文由 Neeleshkumar S 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题