使用下面的代码将pcappackes传递到队列中,是否可以将其传递到kafka队列中,以便kafka消费者可以从kafka producer中提取pcappackes?
StringBuilder errbuf = new StringBuilder();
Pcap pcap = Pcap.openOffline("tests/test-afs.pcap", errbuf);
PcapPacketHandler<Queue<PcapPacket>> handler = new PcapPacketHandler<Queue<PcapPacket>>() {
public void nextPacket(PcapPacket packet, Queue<PcapPacket> queue) {
PcapPacket permanent = new PcapPacket(packet);
queue.offer(packet);
}
}
Queue<PcapPacket> queue = new ArrayBlockingQueue<PcapPacket>();
pcap.loop(10, handler, queue);
System.out.println("we have " + queue.size() + " packets in our queue");
pcap.close();
2条答案
按热度按时间b5lpy0ml1#
虽然我参加聚会迟到了,但如果有类似需求的人发现它有用,我还是在这里分享我的工具:pcap处理器(githuburl)。我已经为我的研究开发了一个python工具来读取原始pcap文件,处理它们并将它们提供给我的流处理器。因为我尝试了各种流协议,所以我在这个工具中实现了所有这些协议。当前支持的接收器:
csv文件
apache kafka(编码为json字符串)
http rest(json)
grpc公司
控制台(只需打印到终端)
例如,阅读
input.pcap
要将其发送到kafka主题,需要在kafka_sink.py中调整引导端点和主题名称。然后,从父目录执行以下命令将读取文件并将数据包发送到kafka队列。有关更多详细信息和安装说明,请查看github自述,如果遇到任何问题,请随时打开github问题。
eufgjt7s2#
kafka支持将任意二进制数据存储为消息。在您的例子中,您只需要提供一个pcappacket类二进制序列化程序(以及用于读取的反序列化程序)。
有关示例,请参见kafka:编写自定义序列化程序。