我有一个由一列组成的 pyspark 数据框,名为 json
,其中每一行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每一行都是解析后的 json。
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
我尝试使用 json.loads
映射每一行:
(df
.select('json')
.rdd
.map(lambda x: json.loads(x))
.toDF()
).show()
但这会返回一个 TypeError: expected string or buffer
我怀疑部分问题是当从 dataframe
转换为 rdd
时,架构信息丢失,所以我也尝试手动输入架构信息:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
但我得到相同的 TypeError
。
看看 这个答案,看起来用 flatMap
可能在这里很有用,但我也没有成功:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
我收到此错误: AttributeError: 'unicode' object has no attribute 'get'
。
原文由 Steve 发布,翻译遵循 CC BY-SA 4.0 许可协议
如果您之前将数据帧转换为字符串的 RDD,则将具有 json 字符串的数据帧转换为结构化数据帧在 spark 中实际上非常简单(请参阅: http://spark.apache.org/docs/latest/sql-programming-guide。 html#json-数据集)
例如: