用python语言使用spark streaming 读取kafka数据?

新手上路,请多包涵

各位大神, 求解!

用python来实现spark streaming 读取kafka数据

本人首先查阅了大量的资料, 最开始使用spark-streaming-kafka,经过踩坑后返现spark-streaming-kafka-0.8版本的支持python语言,但是不支持kafka的身份认证。更高版本的只支持scala和java, 最后无奈放弃。

后来参考了官方文档:
https://spark.apache.org/docs/2.4.0/structured-streaming-kafk...

看了官方文档之后打算使用spark-sql-kafka-0-10_2.11,这个应该是支持kafka身份认证的。
但是写的代码报错,报错如下:

py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:487)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:414)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:66)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 20 more

贴出我写的代码如下:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("KafkaExample")\
    .getOrCreate()

kafkaConf = {
    "kafka.bootstrap.servers": "xxxxxx:9092",
    "subscribe": "topic",
    "kafka.auto.offset.reset": "earliest",
    "kafka.group.id": "default",
    "kafka.security.protocol": "SASL_PLAINTEXT",
    "kafka.sasl.mechanism": "SCRAM-SHA-256",
    "kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='${username}' password='${password}';",
    "kafka.partition.assignment.strategy": "org.apache.kafka.clients.consumer.RangeAssignor"
}

# 创建 Kafka 数据源
df = spark.readStream.format("kafka").options(**kafkaConf).load()

# .option("kafka.partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor") \
# 对从Kafka接收到的数据进行处理
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show# 查询数据



query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()
阅读 2.4k
1 个回答
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏