如何将 Spark Streaming 数据转换为 Spark DataFrame

新手上路,请多包涵

到目前为止,Spark还没有创建流式数据的DataFrame,但是我在做异常检测的时候,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了。试了好几种方法,仍然无法将DStream转为DataFrame,也无法将DStream内部的RDD转为DataFrame。

这是我最新版本的代码的一部分:

 import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator

sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

正如你在 main() 函数中看到的,当我使用 ssc.socketTextStream() 方法读取输入流数据时,它会生成 DStream,然后我尝试将 DStream 中的每个个体转换为 Row,希望我可以将数据转换为数据帧稍后。

如果我在这里使用 ppprint() 打印出 features_rdd,它会起作用,这让我想到,features_rdd 中的每个个体都是一批 RDD,而整个 features_rdd 是一个 DStream。

然后我创建了 streamrdd_to_df() 方法并希望将每批 RDD 转换为数据帧,它给了我错误,显示:

错误 StreamingContext:启动上下文时出错,将其标记为已停止 java.lang.IllegalArgumentException:要求失败:未注册输出操作,因此无需执行

有没有想过如何对 Spark 流数据进行 DataFrame 操作?

原文由 Cherry Wu 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 586
2 个回答
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题