在kafka中处理数据流

dohp0rv5  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(427)

我正在尝试设置kafka使用者来处理来自kafka流的数据。我能够设置到流的连接,数据是可见的,但它是特殊字符和ascii的混合体。
我正在使用内置的Kafka控制台,但也尝试了python版本的合流Kafka。唯一需要遵循的参数是使用带有scram-sha-256的sasl\ u明文安全协议。我也愿意使用其他方法来解析输出(如果可能的话,不用java)。
Kafka控制台

bin/kafka-console-consumer.sh --bootstrap-server server:9092 \
--topic TOPIC --from-beginning --consumer.config=consumer.properties

合流KafkaPython

topics = "TOPIC"
conf = {
        "bootstrap.servers": "server:9092",
        "group.id": "group",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms" : "SCRAM-SHA-256",
}
c = Consumer(conf)
c.subscribe([topics])
running = True
while running:
        message = c.poll()
        print(message.value())
c.close()

输出

PLE9K1PKH3S0MAY38ChangeRequest : llZYMEgVmq2CHG:Infra RequestKSUSMAINCHANGEKC-10200-FL01DATA_MISSINGCHGUSD
DATA_MISSINGDATA_MISSINGUSD
CANCEL

▒▒12SLM:Measurement"Schedule(1 = 0)USDUSD▒▒▒
                                                              l▒l▒V?▒▒▒
                                                                       llZYMEgVmq
company_team team_nameTEAM###SGP000000140381PPL000002020234
Latha M▒>▒>▒ChangeRequest
hello:1234543534 cloud abcdef▒▒▒
                                                         ▒Ի▒
                                                            ▒▒▒
                                                               John Smithjs12345SGP000000140381▒NPPL000002020234
▒Ի▒

我最初试图解析标准输出上的数据,但最后的期望是在数据库中获得解析后的数据。如有任何建议,将不胜感激。

njthzxwz

njthzxwz1#

你的信息好像是用二进制编码的。要打印它们,你需要设置一个二进制解码器,并通过它传递它们。如果您使用特定的模式生成了它们,您可能还需要使用包含给定主题的模式的模式注册表来反序列化对象。你看到的是:

message_bytes = io.BytesIO(message.value())
decoder = BinaryDecoder(message_bytes)
a2mppw5e

a2mppw5e2#

正如jaivalis所提到的,您的生产者和您用来接收数据的消费者之间似乎不匹配。kafka streams公开了两个属性,用于控制通过拓扑的数据的序列化和反序列化;default.value.serde、default.key.serde。我建议您检查streams应用程序的配置,以找到合适的反序列化程序供使用者使用。
https://kafka.apache.org/documentation/#streamsconfigs
但是请注意,这些serde可能会被您的streams应用程序实现覆盖。请务必检查您的实现,以确保找到正确的序列化格式。
https://kafka.apache.org/21/documentation/streams/developer-guide/datatypes.html#overriding-默认serdes

相关问题