使用 PySpark 将 JSON 文件读取为 Pyspark Dataframe?

新手上路,请多包涵

如何使用 PySpark 读取以下 JSON 结构以激发数据帧?

我的 JSON 结构

{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}

我试过:

 df = spark.read.json('simple.json');

我希望输出 a、b、c 作为列,值作为相应的行。

谢谢。

原文由 Karthik Mannava 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 753
2 个回答

Json 字符串变量

如果你有 json 字符串作为变量, 那么你可以这样做

simple_json = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
rddjson = sc.parallelize([simple_json])
df = sqlContext.read.json(rddjson)

from pyspark.sql import functions as F
df.select(F.explode(df.results).alias('results')).select('results.*').show(truncate=False)

这会给你

+---+---+----+
|a  |b  |c   |
+---+---+----+
|1  |2  |name|
|2  |5  |foo |
+---+---+----+

Json 字符串作为文件中的单独行(sparkContext 和 sqlContext)

如果你 在文件中有 json 字符串作为单独的行, 那么你可以 使用 sparkContext 读取它到 rdd[string] 如上所述,其余过程与上面相同

rddjson = sc.textFile('/home/anahcolus/IdeaProjects/pythonSpark/test.csv')
df = sqlContext.read.json(rddjson)
df.select(F.explode(df['results']).alias('results')).select('results.*').show(truncate=False)

Json 字符串作为文件中的单独行(仅限 sqlContext)

如果您将 json 字符串作为文件中的单独行, 那么您只能使用 sqlContext 。但是这个过程很复杂,因为 你必须为它创建模式

df = sqlContext.read.text('path to the file')

from pyspark.sql import functions as F
from pyspark.sql import types as T
df = df.select(F.from_json(df.value, T.StructType([T.StructField('results', T.ArrayType(T.StructType([T.StructField('a', T.IntegerType()), T.StructField('b', T.IntegerType()), T.StructField('c', T.StringType())])))])).alias('results'))
df.select(F.explode(df['results.results']).alias('results')).select('results.*').show(truncate=False)

这应该给你与上面相同的结果

我希望这个答案有帮助

原文由 Ramesh Maharjan 发布,翻译遵循 CC BY-SA 3.0 许可协议

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import explode

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
json_data = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
json_rdd = sc.parallelize([json_data])
df = spark.read.json(json_rdd)
df =df.withColumn("results", explode(df.results)).select(
                         col("results.a").alias("a"),
                         col("results.b").alias("b"),
                         col("results.c").alias("c") )
df.show()

原文由 Kaustuv 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题