我有一个Kafka主题客户,其中我流式传输了一个csv文件。现在有没有可能在Kafka的基础上转换数据
计数。例如,如果计数小于20,则将其发送到主题a;如果计数大于20,则将其发送到主题b
我是新来Kafka,我试图这样,但它不工作
builder.stream("Customer")
.groupByKey()
.count()
.toStream()
.filter((k,v)-> String(v) > 20)
.to("test_A");
这个代码是错误的,我很确定,但请任何人能帮我一下吗
3条答案
按热度按时间qhhrdooz1#
您现在要做的是根据一个条件仅将一部分记录发送到一个主题,而放弃其他主题。如果要将一部分发送到一个主题,将另一部分发送到另一个主题,则应使用分支运算符。
像这样:
另一件事你应该注意的是比较必须在数字之间,而不是字符串之间,因为当你对字符串使用更大的比较器时,你是在比较字符串的长度和字典顺序
hfwmuf9z2#
简单的解决方案是创建一个producer,一次从输入的csv文件中读取一行,并根据计数条件将其发送到特定的主题。
你可以跟着https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1 为了便于理解Kafka和python
klh5stk13#