python 如何链接kafka 本机是否要安装kafka?

我电脑没有装过kafka直接用下面的代码模拟python kafka消费数据报错“kafka.errors.NoBrokersAvailable: NoBrokersAvailable” 是不是由于没有安装kafka的原因。
如果我想模拟kafka读一个excle 然后我去从kafka消费这个excel的数据,请问该如何实现呢?
如果有同学愿意帮助,愿提供话费酬谢,谢谢各位~

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json

class Kafka_producer():
    '''
    使用kafka的生产模块
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
            ))

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print (e)


class Kafka_consumer():
    '''
    使用Kafka—python的消费模块
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort ))

    def consume_data(self):
        try:
            for message in self.consumer:
                # print json.loads(message.value)
                yield message
        except KeyboardInterrupt as  e:
            print (e)


def main():
    '''
    测试consumer和producer
    :return:
    '''
    ##测试生产模块
    producer = Kafka_producer("127.0.0.1", 9092, "ranktest")
    for id in range(10):
        params = '{abetst}:{null}---'+str(id)
        producer.sendjsondata(params)
    ##测试消费模块
    #消费模块的返回格式为ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
    #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
    #\serialized_key_size=-1, serialized_value_size=21)
    consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest')
    message = consumer.consume_data()
    for i in message:
        print (i.value)


if __name__ == '__main__':
    main()
阅读 4.6k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进