我是Kafka的新人。我有一台linux机器,端口号2552从外部服务器获取数据流。我想使用kafka producer来监听这个端口,并将数据流传输到一个主题。
esyap4oy1#
您不必说明端口2552上的通信量是tcp还是udp,但一般来说,您可以轻松编写一个程序,在该端口上侦听数据,将接收到的数据解析为离散消息,然后使用kafka producer api将数据作为kafka消息(带或不带密钥)发布到kafka主题。在某些情况下,现有的开放源代码可能已经为您完成了这项工作,因此您不需要从头开始编写。如果端口2552协议是众所周知的协议,例如在iana中注册的tcp或udp呼叫记录协议(请参阅ftp://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt)然后甚至可能有一个现有的Kafka连接器或代理支持它。在github上搜索kafka connect-[协议]或查看https://www.confluent.io/product/connectors/甚至可能有一个通用的tcp或udp连接器,您可以使用它作为参考,为您尝试接收的特定协议配置或构建自己的连接器。
wgx48brx2#
这是一个完整的hack,但适用于沙盒示例:
nc -l 2552 | ./bin/kafka-console-producer --broker-list localhost:9092 --topic test_topic
它使用netcat监听tcp端口,并将接收到的任何内容通过管道传送到kafka主题。一个快速的谷歌也发现了这个问题https://github.com/dhanuka84/kafka-connect-tcp 它看起来做了类似的事情,但更强大,使用Kafka连接api。
2条答案
按热度按时间esyap4oy1#
您不必说明端口2552上的通信量是tcp还是udp,但一般来说,您可以轻松编写一个程序,在该端口上侦听数据,将接收到的数据解析为离散消息,然后使用kafka producer api将数据作为kafka消息(带或不带密钥)发布到kafka主题。
在某些情况下,现有的开放源代码可能已经为您完成了这项工作,因此您不需要从头开始编写。如果端口2552协议是众所周知的协议,例如在iana中注册的tcp或udp呼叫记录协议(请参阅ftp://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt)然后甚至可能有一个现有的Kafka连接器或代理支持它。在github上搜索kafka connect-[协议]或查看https://www.confluent.io/product/connectors/
甚至可能有一个通用的tcp或udp连接器,您可以使用它作为参考,为您尝试接收的特定协议配置或构建自己的连接器。
wgx48brx2#
这是一个完整的hack,但适用于沙盒示例:
它使用netcat监听tcp端口,并将接收到的任何内容通过管道传送到kafka主题。
一个快速的谷歌也发现了这个问题https://github.com/dhanuka84/kafka-connect-tcp 它看起来做了类似的事情,但更强大,使用Kafka连接api。