IDEA运行flink程序报错怎么改?

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
    at To_CK.main(To_CK.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 1 more

代码如下

class CkSinkBuilder extends JdbcStatementBuilder[(Int, String, String)] {
  def accept(ps: PreparedStatement, v: (Int, String, String)): Unit = {
    ps.setInt(1, v._1)
    ps.setString(2, v._2)
    ps.setString(3, v._3)
  }
}

object To_CK {
  def main(args: Array[String]): Unit = {

    //获得环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //设置并发为1,防止打印控制台乱序
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //Flink 默认使用 ProcessingTime 处理,设置成event time
    val tEnv = StreamTableEnvironment.create(env) //Table Env 环境
    //从Kafka读取数据
    val pros = new Properties()
    pros.setProperty("bootstrap.servers", "192.168.10.102:9092")
    pros.setProperty("group.id", "test_1")
    pros.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    pros.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    pros.setProperty("auto.offset.reset", "latest")
    import org.apache.flink.api.scala._
    val dataSource = env.addSource(new FlinkKafkaConsumer[String]("flink_test_1", new SimpleStringSchema(), pros))
    val sql="insert into flink_test_1.test_1(userid,items,create_date)values(?,?,?)"
    val result = dataSource.map(line => {
      val x = line.split("\t")
      //print("收到数据",x(0),x(1),x(2),"\n")
      val member_id = x(0).trim.toLong
      val item = x(1).trim
      val times = x(2).trim
      var time = 0l
      try{time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(times).getTime} //时间戳类型
      catch {case e: Exception => {print( e.getMessage)}}
      (member_id.toInt, item.toString ,time.toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Int, String, Long)](Time.seconds(2)) {
      override def extractTimestamp(t: (Int, String, Long)): Long = t._3
    }).map(x=>{(x._1,x._2,new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(x._3))}) //时间还原成datetime类型
    //result.print()
    result.addSink(JdbcSink.sink[(Int,String,String)](sql,new CkSinkBuilder,new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:clickhouse://192.168.10.102:8123")
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUsername("flink_test_1")
        .build()
    ))



    env.execute("To_CK")
  }


}
阅读 2.4k
1 个回答

org/apache/flink/api/common/serialization/DeserializationSchema这个类,在

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-core</artifactId>
   <version>(xxx-version-xxx)</version>
</dependency>

根据需要,设置相应的version即可。


本文参与了SegmentFault 思否面试闯关挑战赛,欢迎正在阅读的你也加入。
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进