在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决?

我的Spark Streaming代码如下所示:

val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2)

val words = lines.filter(examtep(_))
words.foreachRDD(exam(_))

//some other code

 def exam(rdd:RDD[SparkFlumeEvent]):Unit={
    if(rdd.count()>0) {
      println("****Something*****")
      val newrdd=rdd.map(sfe=>{
      val tmp=new String(sfe.event.getBody.array())
      tmp
      })
    newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest")
    }
}

当words.foreachRDD(exam(_))中每次执行exam()方法的时候,都会执行newrdd.saveAsTextFile("/user/''''''"),但是HDFS上Temperaturetest文件夹里的内容每次都会被覆盖掉,只保存着最后一次saveAsTextFIle的内容,怎样才能让所有数据都存储到Temperaturetest中呢??

PS:我的Spark版本为1.2.1

阅读 22.5k
4 个回答
新手上路,请多包涵

朋友你好 请问这个问题你解决了没? 我也遇到这样的问题了 能否指点一二

新手上路,请多包涵

转为data_frame,然后用sql语句执行想要的操作。
类似于如下:

    def process(time,rdd):
        try:            
            hqlContext = getHqlContextInstance(rdd.context)
            
            hqlContext.sql("use shy")
            userframe =hqlContext.createDataFrame(rdd,['create_time','userid','amount'])
            userframe.registerTempTable("userframe")
            hqlContext.sql("insert overwrite table trade_order_temp select * from userframe")
            
            
            print "************"
            
        except Exception,e:
            print e

    rowrdd.foreachRDD(process)
新手上路,请多包涵

我也遇到了相同的问题。foreachRDD默认保存的文件名就是part0000_ ... part0000n,每一个rdd都是这样。所以在同一路径下后面的文件可能会覆盖前面的。我是在文件夹后面再加上时间戳,来避免覆盖。不知道还有没有更好的方法
newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest"+System.currentTimeMillis())

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进