Pyspark:解析一列json字符串

新手上路,请多包涵

我有一个由一列组成的 pyspark 数据框,名为 json ,其中每一行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每一行都是解析后的 json。

 # Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

我尝试使用 json.loads 映射每一行:

 (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

但这会返回一个 TypeError: expected string or buffer

我怀疑部分问题是当从 dataframe 转换为 rdd 时,架构信息丢失,所以我也尝试手动输入架构信息:

 schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

但我得到相同的 TypeError

看看 这个答案,看起来用 flatMap 可能在这里很有用,但我也没有成功:

 schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

我收到此错误: AttributeError: 'unicode' object has no attribute 'get'

原文由 Steve 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 913
2 个回答

如果您之前将数据帧转换为字符串的 RDD,则将具有 json 字符串的数据帧转换为结构化数据帧在 spark 中实际上非常简单(请参阅: http://spark.apache.org/docs/latest/sql-programming-guide。 html#json-数据集

例如:

 >>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

原文由 Mariusz 发布,翻译遵循 CC BY-SA 3.0 许可协议

对于 Spark 2.1+ ,您可以使用 from_json 它允许在数据框中保留其他非 json 列,如下所示:

 from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

您让 Spark 派生 json 字符串列的架构。然后 df.json 列不再是StringType,而是正确解码的json结构,即嵌套 StrucTypedf 9452fd9e2dbe0e6f22d0bc03bd0-46的所有其他列都保留了是。

您可以通过以下方式访问 json 内容:

 df.select(col('json.header').alias('header'))

原文由 Martin Tapp 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏