import kafka.serializer.Encoder
import kafka.utils.VerifiableProperties
import org.jnetpcap.packet.PcapPacket
class PcapEncoder(verifiableProperties: VerifiableProperties) extends Encoder[PcapPacket] { override def toBytes(customMessage: PcapPacket): Array[Byte] = customMessage.transferStateAndDataFrom(PcapPacket : Array[Byte]) }
`这是我编写的编码器,用于对使用jnetpcap库捕获的数据包进行编码,以将其传递给kafka消费者。但我有错误,这是实现编码器的方法吗?
1条答案
按热度按时间pkmbmrz71#
假设您已经安装并运行了kafka,您将使用其中一个producer接口(可能是kafka.javaapi.producer.producer)建立到kafka到特定主题的连接。然后,当您获得捕获的数据包时,您将其传递给api。Kafka没有原始字节的问题。完成后,通过该api关闭连接。