相对于其他列或列表的Pyspark扫描列

yks3o0rb  于 2023-01-01  发布在  Spark
关注(0)|答案(2)|浏览(130)

给定示例 Dataframe :

  1. +---+---------------+
  2. | id| log|
  3. +---+---------------+
  4. | 1|Test logX blk_A|
  5. | 2|Test logV blk_B|
  6. | 3|Test logF blk_D|
  7. | 4|Test logD blk_F|
  8. | 5|Test logB blk_K|
  9. | 6|Test logY blk_A|
  10. | 7|Test logE blk_C|
  11. +---+---------------+

我试图通过比较日志和一个块列表(或者df列,我可以很容易地转换它)来标记它,这意味着我需要扫描这个列表中的每个logLine并添加label列。
给定列表:

  1. anomalous_blocks = ['blk_A','blk_C','blk_D']

预期结果 Dataframe 为:

  1. +---+---------------+-----+
  2. | id| log|Label|
  3. +---+---------------+-----+
  4. | 1|Test logX blk_A| True|
  5. | 2|Test logV blk_B|False|
  6. | 3|Test logF blk_D| True|
  7. | 4|Test logD blk_F|False|
  8. | 5|Test logB blk_K|False|
  9. | 6|Test logY blk_A| True|
  10. | 7|Test logE blk_C| True|
  11. +---+---------------+-----+

我试图在SQL或Spark中思考和寻找一个可以完成这一任务的解决方案,但结果很短。
我想使用一个udf(用户定义函数),如下所示:

  1. from pyspark.sql.functions import udf
  2. def check_anomaly(text, anomalies):
  3. for a in anomalies:
  4. if a in text:
  5. return True
  6. return False
  7. anomaly_matchUDF = udf(lambda x,y:check_anomaly(x,y))

但这需要太长的时间,似乎不是正确的方式去做这件事。
如有任何建议,我们将不胜感激。

    • 编辑:**

为了清楚起见,列表的大小远小于行/日志的数量。换句话说,给定N个日志行和标记为异常的M个块的列表
N〉〉M

    • 编辑2:**

更新df以更准确地表示真实情况

tpgth1q7

tpgth1q71#

您可以使用likecontains操作符,并使用reduce创建条件链。

  1. anomalous_blocks = ['blk_A','blk_C','blk_D']
  2. label_condition = reduce(lambda a, b: a | b,
  3. [func.col('log').like('%'+k+'%') for k in anomalous_blocks]
  4. )
  5. # Column<'((log LIKE %blk_A% OR log LIKE %blk_C%) OR log LIKE %blk_D%)'>
  6. data_sdf. \
  7. withColumn('label', label_condition). \
  8. show()
  9. # +---+---------------+-----+
  10. # | id| log|label|
  11. # +---+---------------+-----+
  12. # | 1|Test logX blk_A| true|
  13. # | 2|Test logV blk_B|false|
  14. # | 3|Test logF blk_D| true|
  15. # | 4|Test logD blk_F|false|
  16. # | 5|Test logB blk_K|false|
  17. # | 6|Test logY blk_A| true|
  18. # | 7|Test logE blk_C| true|
  19. # +---+---------------+-----+
展开查看全部
niwlg2el

niwlg2el2#

您可以在pyspark.sql.Column上使用isin方法来实现这一点,而不需要UDF(请注意,我稍微修改了anomalous_blocks列表的内容,以便与df的内容完全匹配。由于您使用的是N >> M,因此这应该非常便宜):

  1. df = spark.createDataFrame(
  2. [
  3. (1, "Test log blk_A"),
  4. (2, "Test log blk_B"),
  5. (3, "Test log blk_D"),
  6. (4, "Test log blk_F"),
  7. (5, "Test log blk_K"),
  8. (6, "Test log blk_A"),
  9. (7, "Test log blk_C")
  10. ],
  11. ["id", "log"]
  12. )
  13. anomalous_blocks = ['blk_A','blk_C','blk_D']
  14. # Solution starts here
  15. adapted_anomalous_blocks = ["Test log " + x for x in anomalous_blocks]
  16. output = df.withColumn("Label", df.log.isin(adapted_anomalous_blocks))
  17. output.show()
  18. +---+--------------+-----+
  19. | id| log|Label|
  20. +---+--------------+-----+
  21. | 1|Test log blk_A| true|
  22. | 2|Test log blk_B|false|
  23. | 3|Test log blk_D| true|
  24. | 4|Test log blk_F|false|
  25. | 5|Test log blk_K|false|
  26. | 6|Test log blk_A| true|
  27. | 7|Test log blk_C| true|
  28. +---+--------------+-----+
展开查看全部

相关问题