Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
at To_CK.main(To_CK.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 1 more
代码如下
class CkSinkBuilder extends JdbcStatementBuilder[(Int, String, String)] {
def accept(ps: PreparedStatement, v: (Int, String, String)): Unit = {
ps.setInt(1, v._1)
ps.setString(2, v._2)
ps.setString(3, v._3)
}
}
object To_CK {
def main(args: Array[String]): Unit = {
//获得环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //设置并发为1,防止打印控制台乱序
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //Flink 默认使用 ProcessingTime 处理,设置成event time
val tEnv = StreamTableEnvironment.create(env) //Table Env 环境
//从Kafka读取数据
val pros = new Properties()
pros.setProperty("bootstrap.servers", "192.168.10.102:9092")
pros.setProperty("group.id", "test_1")
pros.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.setProperty("auto.offset.reset", "latest")
import org.apache.flink.api.scala._
val dataSource = env.addSource(new FlinkKafkaConsumer[String]("flink_test_1", new SimpleStringSchema(), pros))
val sql="insert into flink_test_1.test_1(userid,items,create_date)values(?,?,?)"
val result = dataSource.map(line => {
val x = line.split("\t")
//print("收到数据",x(0),x(1),x(2),"\n")
val member_id = x(0).trim.toLong
val item = x(1).trim
val times = x(2).trim
var time = 0l
try{time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(times).getTime} //时间戳类型
catch {case e: Exception => {print( e.getMessage)}}
(member_id.toInt, item.toString ,time.toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Int, String, Long)](Time.seconds(2)) {
override def extractTimestamp(t: (Int, String, Long)): Long = t._3
}).map(x=>{(x._1,x._2,new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(x._3))}) //时间还原成datetime类型
//result.print()
result.addSink(JdbcSink.sink[(Int,String,String)](sql,new CkSinkBuilder,new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://192.168.10.102:8123")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("flink_test_1")
.build()
))
env.execute("To_CK")
}
}
org/apache/flink/api/common/serialization/DeserializationSchema
这个类,在根据需要,设置相应的version即可。