如何连续地向kafka发送数据?

pxyaymoc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(456)

这个问题在这里已经有答案了

如何连续地向Kafka提供嗅探到的数据包(4个答案)
四年前关门了。
我正在尝试连续地向kafka代理/消费者发送数据(使用tshark嗅探的数据包)。
以下是我遵循的步骤:
1启动zookeeper:

kafka/bin/zookeeper-server-start.sh ../kafka//config/zookeeper.properties

2已启动kafka服务器:

kafka/bin/kafka-server-start.sh ../kafka/config/server.properties

三。Kafka消费者:

kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic \
                                           'my-topic' --from-beginning

4编写了以下python脚本以将嗅探到的数据发送给使用者:

from kafka import KafkaProducer
import subprocess
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('my-topic', subprocess.check_output(['tshark','-i','wlan0']))

但这是停留在procuder终端和输出:

Capturing on 'wlan0'
605
^C

任何东西都不会转移到消费者身上。
我知道我可以用 pyshark 要在python上实现tshark:

import pyshark
capture = pyshark.LiveCapture(interface='eth0')
capture.sniff(timeout=5)
capture1=capture[0]
print capture1

但我不知道如何连续地将捕获的数据包从生产者发送到消费者。有什么建议吗?
谢谢您!

xpcnnkqh

xpcnnkqh1#

检查以下链接。
http://zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/
在这里实现kafka生产者,定义了用于测试集群的kafka生产者代码的主要部分。在main类中,我们设置数据管道和线程:

LOGGER.debug("Setting up streams");
PipedInputStream send = new PipedInputStream(BUFFER_LEN);
PipedOutputStream input = new PipedOutputStream(send);

LOGGER.debug("Setting up connections");
LOGGER.debug("Setting up file reader");
BufferedFileReader reader = new BufferedFileReader(filename, input);
LOGGER.debug("Setting up kafka producer");
KafkaProducer kafkaProducer = new KafkaProducer(topic, send);

LOGGER.debug("Spinning up threads");
Thread source = new Thread(reader);
Thread kafka = new Thread(kafkaProducer);

source.start();
kafka.start();

LOGGER.debug("Joining");
kafka.join();
The BufferedFileReader in its own thread reads off the data from disk:
rd = new BufferedReader(new FileReader(this.fileToRead));
wd = new BufferedWriter(new OutputStreamWriter(this.outputStream, ENC));
int b = -1;
while ((b = rd.read()) != -1)
{
    wd.write(b);
}
Finally, the KafkaProducer sends asynchronous messages to the Kafka Cluster:
rd = new BufferedReader(new InputStreamReader(this.inputStream, ENC));
String line = null;
producer = new Producer<Integer, String>(conf);
while ((line = rd.readLine()) != null)
{
    producer.send(new KeyedMessage<Integer, String>(this.topic, line));
}
Doing these operations on separate threads gives us the benefit of having disk reads not block the Kafka producer or vice-versa, enabling maximum performance tunable by the size of the buffer.
Implementing the Storm Topology
Topology Definition
Moving onward to Storm, here we define the topology and how each bolt will be talking to each other:
TopologyBuilder topology = new TopologyBuilder();

topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);

topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4)
        .shuffleGrouping("kafka_spout");

topology.setBolt("text_filter", new TextFilterBolt(), 4)
        .shuffleGrouping("twitter_filter");

topology.setBolt("stemming", new StemmingBolt(), 4)
        .shuffleGrouping("text_filter");

topology.setBolt("positive", new PositiveSentimentBolt(), 4)
        .shuffleGrouping("stemming");
topology.setBolt("negative", new NegativeSentimentBolt(), 4)
        .shuffleGrouping("stemming");

topology.setBolt("join", new JoinSentimentsBolt(), 4)
        .fieldsGrouping("positive", new Fields("tweet_id"))
        .fieldsGrouping("negative", new Fields("tweet_id"));

topology.setBolt("score", new SentimentScoringBolt(), 4)
        .shuffleGrouping("join");

topology.setBolt("hdfs", new HDFSBolt(), 4)
        .shuffleGrouping("score");
topology.setBolt("nodejs", new NodeNotifierBolt(), 4)
        .shuffleGrouping("score");

值得注意的是,数据会被随机移动到每个螺栓,直到连接时除外,因为非常重要的一点是,相同的tweet会被赋予连接螺栓的相同示例。

相关问题