Spark Streaming集成Kafka代码构建失败

env

ubuntu14.04 64bit
java 1.8
scala 2.11.7
spark-hadoop2.6 1.5.1
interlliJ IDEA
sbt 0.13

问题

自己照着Spark streaming example修改了一点代码,用sbt构建一直提示失败

  • 我的代码

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

//import org.apache.spark.clients.producer.{ProducrConfig, KafkaProducer, ProducerRecord}

import java.util.HashMap

object KafkaDemo1 {
    def main(args: Array[String]) {
        if (args.length < 4) {
            System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
            System.exit(1)
        }

        //StreamingExamples.setStreamingLogLevels()

        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("demo1")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")

        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val input = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

        //val words = input.count()//计算单词数量
        println(input.count())
        println(input.first())

        val words = input.flatMap(_.split(" "))
        println("print first 10 lines")
        words.take(10).foreach(println)
        val wordCount = words.map(x => (x, 1L))
          .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
        wordCount.print()

        ssc.start()
        ssc.awaitTermination()
    }
}
  • sbt 配置个人觉得是依赖问题

name := "demo1"
version := "0.0.1"
scalaVersion := "2.11.7"

//libraryDependencies += "org.apache.spark" % "spark-core" % "1.5.1" % "provided"

libraryDependencies ++=    Seq(
    "org.apache.spark" % "spark-core_2.11" % "1.5.1" % "provided",
    "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.1"
)
  • 构建失败提示

[success] Total time: 0 s, completed Oct 23, 2015 10:58:55 AM
[info] Updating {file:/home/adolph/workspace//spark-streaming-kafka/demo1/}demo1...
[info] Resolving org.apache.hadoop#hadoop-mapreduce-client-common;2.2.0 .[info] Resolving com.sun.jersey.jersey-test-framework#jersey-test-framewo[info] Resolving org.apache.hadoop#hadoop-mapreduce-client-shuffle;2.2.0 [info] Resolving org.apache.hadoop#hadoop-mapreduce-client-jobclient;2.2.[info] Resolving org.apache.hadoop#hadoop-yarn-server-nodemanager;2.2.0 .[info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v20111201101[info] Resolving org.scala-lang.modules#scala-parser-combinators_2.11;1.0[info] Resolving com.fasterxml.jackson.module#jackson-module-scala_2.11;2[info] Resolving org.scala-lang.modules#scala-parser-combinators_2.11;1.0[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Compiling 1 Scala source to /home/adolph/workspace/linkernetwork/cloud-data/spark-streaming-kafka/demo1/target/scala-2.11/classes...
[error] /home/adolph/workspace/linkernetwork/cloud-data/spark-streaming-kafka/demo1/demo1.scala:20: not found: type StreamingContext
[error]         val ssc = new StreamingContext(sparkConf, Seconds(2))
[error]                       ^
[error] /home/adolph/workspace/linkernetwork/cloud-data/spark-streaming-kafka/demo1/demo1.scala:20: not found: value Seconds
[error]         val ssc = new StreamingContext(sparkConf, Seconds(2))
[error]                                                   ^
[error] missing or invalid dependency detected while loading class file 'KafkaUtils.class'.
[error] Could not access term api in package org.apache.spark.streaming,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'KafkaUtils.class' was compiled against an incompatible version of org.apache.spark.streaming.
[error] missing or invalid dependency detected while loading class file 'KafkaUtils.class'.
[error] Could not access type JavaStreamingContext in value org.apache.spark.streaming.java,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'KafkaUtils.class' was compiled against an incompatible version of org.apache.spark.streaming.java.
[error] missing or invalid dependency detected while loading class file 'KafkaUtils.class'.
[error] Could not access type StreamingContext in package org.apache.spark.streaming,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'KafkaUtils.class' was compiled against an incompatible version of org.apache.spark.streaming.
[error] 5 errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 8 s, completed Oct 23, 2015 10:59:03 AM

说明

问题可能处在两处

  1. 依赖没有引入正确

  2. 代码中引入的包有问题。

我是用interlliJ作为IDE的,但是刚开始用,还不知道如何自动导包,一直提示报错。

  1. 希望能提供一个IDEA开发scala应用的教程
    希望大家帮我看一下哪里除了问题,

阅读 11.9k
5 个回答

问题处在scala版本和spark等使用到的scala版本不一致。

kafka和spark都使用到scala这一开源语言。它们使用稳定版的scala而不是最新版。

这里笔者使用的是scala 2.11.7,官网上的最新版。存在兼容性问题。

  • 将scala换成2.10.4,spark和kafka使用的稳定版

  • 将依赖包换成2.10的版本

新手上路,请多包涵

我用的就是scala 2.11啊,不过我的spark是自编译的,而官网提供的预编译包都是2.10的,import的jar包要与spark编译时用的scala版本一致.

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进