将 Pandas 数据帧转换为 Spark 数据帧错误

新手上路,请多包涵

我正在尝试将 Pandas DF 转换为 Spark one。 DF头:

 10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691

代码:

 dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)

我得到了一个错误:

 TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

原文由 Ivan Sudos 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.2k
2 个回答

您需要确保您的 pandas 数据框列适合 spark 正在推断的类型。如果您的 pandas 数据框列出如下内容:

 pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

你得到那个错误尝试:

 df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

现在,确保 .astype(str) 实际上是您希望这些列成为的类型。基本上,当底层 Java 代码试图从 python 中的对象推断类型时,它会使用一些观察结果并进行猜测,如果该猜测不适用于列中的所有数据,它会尝试从 pandas 转换为火花它会失败。

原文由 madman2890 发布,翻译遵循 CC BY-SA 3.0 许可协议

我制作了这个脚本,它适用于我的 10 个 pandas 数据框

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types):
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

你也可以在这个 要点 中看到它

有了这个你只需要打电话 spark_df = pandas_to_spark(pandas_df)

原文由 Gonzalo Garcia 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题