在object kafkaStream中获得了Kafka的数据输入流
我想把这个流传给别的类object
如果把kafkaStream放到最后返回 那么会报错:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
如果kafkaStream放在streamingContext.start()前
那就没法返回结果了
该怎么写?
源代码:
def readFromKafka(sc:SparkContext):InputDStream[ConsumerRecord[String,String]] = {
val streamingContext:StreamingContext = new StreamingContext(sc,Seconds(5))
sc.setLogLevel("WARN")
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "127.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-consumer-group",
"auto.offset.reset" -> "latest",
"max.poll.interval.ms" -> "60000",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topic = Array("ccData")
val kafkaStream = KafkaUtils.createDirectStream[String,String](
streamingContext,
PreferConsistent,
Subscribe[String,String](topic,kafkaParams)
)
streamingContext.start()
streamingContext.awaitTermination()
kafkaStream
}
要将 Kafka 数据流传递给另一个类或对象,可以考虑定义一个回调函数并将其作为参数传递给 readFromKafka 函数。这样,您可以在 DStream 处理过程中调用此回调函数来将数据传递给其他类或对象。
首先,修改 readFromKafka 函数以接受一个回调函数参数。回调函数将接受一个类型为 RDD[ConsumerRecord[String, String]] 的参数。
接下来,在 DStream 上使用 foreachRDD 操作,并在其中调用回调函数。这将在每个批次中处理数据并将其传递给其他类或对象。这里是修改后的代码:
现在,您可以为其他类或对象定义一个处理数据的方法,并将其作为回调函数传递给 readFromKafka 函数。例如:
这将使您能够在每个批次中处理数据,并将其传递给其他类或对象。