我在 Hive 中加载了一个非常大的数据集(大约 190 万行和 1450 列)。我需要确定每一列的“覆盖率”,即每列具有非 NaN 值的行的比例。
这是我的代码:
from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string
sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()
covgs = sc.parallelize(df.columns)
.map(lambda x: str(x))
.map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
在 PySpark shell 中尝试这个,如果我然后执行 covgs.take(10)
,它会返回一个相当大的错误堆栈。它说保存文件 /usr/lib64/python2.6/pickle.py
时出现问题。这是错误的最后一部分:
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
有没有更好的方法来完成这个?不过,我不能使用 pandas,因为它目前在我工作的集群上不可用,而且我没有安装它的访问权限。
原文由 RKD314 发布,翻译遵循 CC BY-SA 4.0 许可协议
让我们从一个虚拟数据开始:
您只需要一个简单的聚合:
或者如果你想治疗
NaN
aNULL
:您还可以利用 SQL
NULL
语义来实现相同的结果,而无需创建自定义函数:但这不适用于
NaNs
。如果您更喜欢分数:
或者
斯卡拉等效: