我正在处理一个相当大的 Dataframe (大约10万行,打算达到1000万行),它具有以下结构:
+------+--------------------+--------+--------------------+-------------------+
|LineId| Content| EventId| EventTemplate| Timestamp|
+------+--------------------+--------+--------------------+-------------------+
| 1|Receiving block b...|ef6f4915|Receiving block <...|2009-11-08 20:35:18|
| 2|BLOCK* NameSystem...|9bc09482|BLOCK* NameSystem...|2009-11-08 20:35:18|
| 3|Receiving block b...|9ca53bce|Receiving block <...|2009-11-08 20:35:19|
+------+--------------------+--------+--------------------+-------------------+
我想添加一个标签,我使用以下函数来完成此操作:
from functools import reduce
label_condition = reduce(lambda a, b: a|b, (df['Content'].like('%'+pat+"%") for pat in blocks))
其中blocks
是一个列表,包含定义行是否异常的块(我们称之为令牌)。该函数检查Content
字段是否包含blocks
列表的任何值。
这个列表的大小大约是17k,我认为这就是导致这个问题的原因。
当我尝试将其添加到 Dataframe ,或者只是或评估此操作时,我收到以下错误:
Py4JJavaError: An error occurred while calling o50581.toString.
: java.lang.StackOverflowError
at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
...
我上网看了看,Spark的一个过于复杂的计划的执行可能会有问题,或者使用检查点来避免这类事情,但我不知道该如何去做。我尝试在评估之前添加一个检查点,我也尝试使用一个select来减少df到"Content"列,但没有效果。
我在Scala中找到this solution来优化reduce函数,但我不知道如何将其翻译为Python。
有没有一种方法可以优化它,或者使它至少一步一步地或迭代地进行,以避免堆栈溢出?
先谢了。
1条答案
按热度按时间c9qzyr3d1#
您可以尝试使用接受正则表达式的
rlike
方法-将正则表达式模式作为'element1|element2|...'
传递。