各位大神, 求解!
用python来实现spark streaming 读取kafka数据
本人首先查阅了大量的资料, 最开始使用spark-streaming-kafka,经过踩坑后返现spark-streaming-kafka-0.8版本的支持python语言,但是不支持kafka的身份认证。更高版本的只支持scala和java, 最后无奈放弃。
后来参考了官方文档:
https://spark.apache.org/docs/2.4.0/structured-streaming-kafk...
看了官方文档之后打算使用spark-sql-kafka-0-10_2.11,这个应该是支持kafka身份认证的。
但是写的代码报错,报错如下:
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:487)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:414)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:66)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 20 more
贴出我写的代码如下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("KafkaExample")\
.getOrCreate()
kafkaConf = {
"kafka.bootstrap.servers": "xxxxxx:9092",
"subscribe": "topic",
"kafka.auto.offset.reset": "earliest",
"kafka.group.id": "default",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='${username}' password='${password}';",
"kafka.partition.assignment.strategy": "org.apache.kafka.clients.consumer.RangeAssignor"
}
# 创建 Kafka 数据源
df = spark.readStream.format("kafka").options(**kafkaConf).load()
# .option("kafka.partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor") \
# 对从Kafka接收到的数据进行处理
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show# 查询数据
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
根据错误信息,看起来是缺少 Kafka 的 ByteArrayDeserializer 类。可能是你的环境没有正确配置 Kafka 客户端的依赖项。请尝试确认是否已经正确安装并配置了 Kafka 客户端。另外,你可以尝试使用 pip 安装 Kafka 客户端的依赖,例如:
此外,你还需要在 Spark 中将 Kafka 客户端依赖项添加到 classpath 中,你可以在代码中添加以下行来执行此操作:
注意,这里的路径需要指向正确的 Kafka 客户端 jar 包。
如果问题仍然存在,请尝试使用其他 Python 的 Kafka 客户端库来实现读取 Kafka 数据,例如 kafka-python 或 confluent-kafka-python。同时,也可以考虑使用其他的流处理框架,例如 Streamlit 等,来读取 Kafka 数据。
在使用
spark.sparkContext.addPyFile('/path/to/kafka-clients.jar')
添加依赖包后,需要确保在 Spark 应用程序中正确引入 Kafka 相关的类库。一种常见的方法是使用
spark-submit
命令提交 Spark 应用程序时,将 Kafka 相关的 JAR 包以--jars
的参数形式添加到 Spark 的 Classpath 中。例如:其中
my_spark_app.py
是你的 Spark 应用程序的入口文件。这样,Spark 应用程序就可以在运行时正确地找到并加载 Kafka 相关的类库。另外,确保 Kafka 服务器正常运行,并在你的应用程序代码中正确配置了 Kafka 连接参数,以便正确连接和操作 Kafka。