在通常的结构化\u kafka \u wordcount.py代码中,
当我把一行行字 udf
如下图所示,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
警告将继续显示:
警告cachedkafkaconsumer:cachedkafkaconsumer未在不间断线程中运行。当cachedkafkaconsumer的方法因kafka-1894而中断时,它可能会挂起
另一方面,当我用 pyspark.sql.functions.split
,一切正常。
words = lines.select(
explode(
split(lines.value, ' ')
)
)
为什么会发生这种情况以及如何修复警告?
这是我在实践中尝试执行的代码:
pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
1条答案
按热度按时间p8ekf7hl1#
除了拒绝pythonudfs*之外,您在代码中对此问题无能为力。正如您在异常消息中看到的
UninterruptibleThread
是对kafka bug(kafka-1894)的一种解决方法,设计用于在中断时防止无限循环KafkaConsumer
.它不用于
PythonUDFRunner
(在那里引入特殊情况可能没有意义)。我个人不会担心的,除非你遇到一些相关的问题。python代码永远不会直接与
KafkaConsumer
. 如果您遇到任何问题,应该有固定的上游-在这种情况下,我建议创建一个jira票。unfold
函数可以用sql函数重写,但这将是一种黑客行为。将邮件计数添加为整数:用它来
explode
```exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)
+------------------+
|value |
+------------------+
|asd 12 |
|asd 12 |
|asd 12 |
|some other message|
+------------------+