引入了kafka-clients跟spring-kafka两个包,版本分别是0.11.0.0和1.3.8,网上找了一些spring集成kafka的帖子都没有提及如何加入用户认证,也就是security.protocol和sasl.mechanism以及kafka_client_jaas配置文件,看到网上有用spring boot配置的,但是不知道转换成xml的形式是怎么样的?我用了网上找的配置方法后无论放在consumerProperties还是containerProperties里都会报错,没法设置,不知道是版本的问题还是设置有问题
spring boot代码:
consumer:
bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
group-id: group-1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
我的代码:
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="${group.id}"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="max.poll.records" value="10"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
<!-- <property name="security.protocol" value="${security.protocol}"/>
<property name="sasl.mechanism" value="${sasl.mechanism}"/> -->
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.rongdu.zgj.modules.dsc.kafka.KafkaConsumerListener"/>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="${topic}"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="5"/>
</bean>