spark如何输出stream到别的类?

在object kafkaStream中获得了Kafka的数据输入流
我想把这个流传给别的类object

如果把kafkaStream放到最后返回 那么会报错:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

如果kafkaStream放在streamingContext.start()前
那就没法返回结果了

该怎么写?

源代码:

def readFromKafka(sc:SparkContext):InputDStream[ConsumerRecord[String,String]] = {

    val streamingContext:StreamingContext = new StreamingContext(sc,Seconds(5))

    sc.setLogLevel("WARN")

    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "127.0.0.1:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test-consumer-group",
      "auto.offset.reset" -> "latest",
      "max.poll.interval.ms" -> "60000",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topic = Array("ccData")

    val kafkaStream = KafkaUtils.createDirectStream[String,String](
      streamingContext,
      PreferConsistent,
      Subscribe[String,String](topic,kafkaParams)
    )

    streamingContext.start()
    streamingContext.awaitTermination()

    kafkaStream
  }
阅读 2.1k
1 个回答

要将 Kafka 数据流传递给另一个类或对象,可以考虑定义一个回调函数并将其作为参数传递给 readFromKafka 函数。这样,您可以在 DStream 处理过程中调用此回调函数来将数据传递给其他类或对象。

首先,修改 readFromKafka 函数以接受一个回调函数参数。回调函数将接受一个类型为 RDD[ConsumerRecord[String, String]] 的参数。


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

def readFromKafka(
    sc: SparkContext,
    processData: RDD[ConsumerRecord[String, String]] => Unit
): Unit = {
  ...
}

接下来,在 DStream 上使用 foreachRDD 操作,并在其中调用回调函数。这将在每个批次中处理数据并将其传递给其他类或对象。这里是修改后的代码:


def readFromKafka(
    sc: SparkContext,
    processData: RDD[ConsumerRecord[String, String]] => Unit
): Unit = {

  val streamingContext: StreamingContext = new StreamingContext(sc, Seconds(5))

  sc.setLogLevel("WARN")

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "127.0.0.1:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "test-consumer-group",
    "auto.offset.reset" -> "latest",
    "max.poll.interval.ms" -> "60000",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  val topic = Array("ccData")

  val kafkaStream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topic, kafkaParams)
  )

  // 使用 foreachRDD 操作并调用回调函数
  kafkaStream.foreachRDD(rdd => {
    processData(rdd)
  })

  streamingContext.start()
  streamingContext.awaitTermination()
}

现在,您可以为其他类或对象定义一个处理数据的方法,并将其作为回调函数传递给 readFromKafka 函数。例如:


object OtherClass {
  def processData(rdd: RDD[ConsumerRecord[String, String]]): Unit = {
    // 在这里处理数据
  }
}
// 在 main 函数或其他适当的地方调用 readFromKafka 函数
readFromKafka(sparkContext, OtherClass.processData)

这将使您能够在每个批次中处理数据,并将其传递给其他类或对象。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题