我有一个具有以下结构的 Spark 数据框。 bodyText_token 具有标记(已处理/单词集)。我有一个已定义关键字的嵌套列表
root
|-- id: string (nullable = true)
|-- body: string (nullable = true)
|-- bodyText_token: array (nullable = true)
keyword_list=[['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]
我需要检查每个关键字列表下有多少标记,并将结果添加为现有数据框的新列。例如:如果 tokens =["become", "farmer","rally","workers","student"]
结果将是 -> [1,2,0]
以下功能按预期工作。
def label_maker_topic(tokens,topic_words):
twt_list = []
for i in range(0, len(topic_words)):
count = 0
#print(topic_words[i])
for tkn in tokens:
if tkn in topic_words[i]:
count += 1
twt_list.append(count)
return twt_list
我在 withColumn
下使用 udf 来访问该函数,但出现错误。我认为这是关于将外部列表传递给 udf。有没有一种方法可以将外部列表和数据框列传递给 udf 并向我的数据框添加一个新列?
topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))
原文由 Jay 发布,翻译遵循 CC BY-SA 4.0 许可协议
最干净的解决方案是使用闭包传递额外的参数:
这不需要对
keyword_list
或您用 UDF 包装的函数进行任何更改。您还可以使用此方法传递任意对象。这可用于传递例如sets
的列表以进行高效查找。如果您想使用当前的 UDF 并直接传递
topic_words
,您必须先将其转换为列文字:根据您的数据和要求,可以选择更有效的解决方案,这些解决方案不需要 UDF(分解 + 聚合 + 折叠)或查找(散列 + 矢量操作)。