udf cause警告:cachedkaffkaconsumer未在不间断线程中运行(kafka-1894)

vsikbqxv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(556)

在通常的结构化\u kafka \u wordcount.py代码中,
当我把一行行字 udf 如下图所示,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)

警告将继续显示:
警告cachedkafkaconsumer:cachedkafkaconsumer未在不间断线程中运行。当cachedkafkaconsumer的方法因kafka-1894而中断时,它可能会挂起
另一方面,当我用 pyspark.sql.functions.split ,一切正常。

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)

为什么会发生这种情况以及如何修复警告?
这是我在实践中尝试执行的代码:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)

def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
p8ekf7hl

p8ekf7hl1#

除了拒绝pythonudfs*之外,您在代码中对此问题无能为力。正如您在异常消息中看到的 UninterruptibleThread 是对kafka bug(kafka-1894)的一种解决方法,设计用于在中断时防止无限循环 KafkaConsumer .
它不用于 PythonUDFRunner (在那里引入特殊情况可能没有意义)。
我个人不会担心的,除非你遇到一些相关的问题。python代码永远不会直接与 KafkaConsumer . 如果您遇到任何问题,应该有固定的上游-在这种情况下,我建议创建一个jira票。

  • 你的 unfold 函数可以用sql函数重写,但这将是一种黑客行为。将邮件计数添加为整数:
from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when

p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"

lines = spark.createDataFrame(
    ["asd message repeated 3 times: [ 12]", "some other message"], "string"
)

lines_with_count = lines.withColumn(
   "message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))

用它来 explode ```
exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")

并摘录:

exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)

+------------------+

|value |

+------------------+

|asd 12 |

|asd 12 |

|asd 12 |

|some other message|

+------------------+

相关问题