给定示例 Dataframe :
+---+---------------+
| id| log|
+---+---------------+
| 1|Test logX blk_A|
| 2|Test logV blk_B|
| 3|Test logF blk_D|
| 4|Test logD blk_F|
| 5|Test logB blk_K|
| 6|Test logY blk_A|
| 7|Test logE blk_C|
+---+---------------+
我试图通过比较日志和一个块列表(或者df列,我可以很容易地转换它)来标记它,这意味着我需要扫描这个列表中的每个logLine并添加label列。
给定列表:
anomalous_blocks = ['blk_A','blk_C','blk_D']
预期结果 Dataframe 为:
+---+---------------+-----+
| id| log|Label|
+---+---------------+-----+
| 1|Test logX blk_A| True|
| 2|Test logV blk_B|False|
| 3|Test logF blk_D| True|
| 4|Test logD blk_F|False|
| 5|Test logB blk_K|False|
| 6|Test logY blk_A| True|
| 7|Test logE blk_C| True|
+---+---------------+-----+
我试图在SQL或Spark中思考和寻找一个可以完成这一任务的解决方案,但结果很短。
我想使用一个udf(用户定义函数),如下所示:
from pyspark.sql.functions import udf
def check_anomaly(text, anomalies):
for a in anomalies:
if a in text:
return True
return False
anomaly_matchUDF = udf(lambda x,y:check_anomaly(x,y))
但这需要太长的时间,似乎不是正确的方式去做这件事。
如有任何建议,我们将不胜感激。
- 编辑:**
为了清楚起见,列表的大小远小于行/日志的数量。换句话说,给定N个日志行和标记为异常的M个块的列表
N〉〉M
- 编辑2:**
更新df以更准确地表示真实情况
2条答案
按热度按时间tpgth1q71#
您可以使用
like
或contains
操作符,并使用reduce
创建条件链。niwlg2el2#
您可以在
pyspark.sql.Column
上使用isin
方法来实现这一点,而不需要UDF(请注意,我稍微修改了anomalous_blocks
列表的内容,以便与df的内容完全匹配。由于您使用的是N >> M
,因此这应该非常便宜):