scala连接kafka失败 如何解决?

按照官方文档的示例,写了以下程序:

def main(args: Array[String]):Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("ylcx_miniApp_mall_recommendation");

    val streamingContext: StreamingContext = new StreamingContext(sparkConf,Seconds(5));

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "X.X.X.X:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "user",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("mallRec")

    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map(record => println(record.key+' '+record.value))
    stream.print()
    streamingContext.start();
    streamingContext.awaitTermination();
  }

运行以后会有以下错误:

21/09/28 12:18:26 WARN NetworkClient: [Consumer clientId=consumer-user-1, groupId=user] Bootstrap broker X.X.X.X:9092 (id: -1 rack: null) disconnected

安全组端口已经确认打开了

如何解决?

阅读 1.3k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题