我有一个用java编写的kafka生产者代码,可以编写kafka消息。以及接收这些消息的用户代码。是否可以将消费者收到的消息写入java中的任何文本文件。
2izufjch1#
谢谢各位,我能做到。一旦用户端接收到数据,就必须编写一个普通的java代码。下面是ode中将消息打印到控制台的行。
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
您可以将所有消息存储到字符串中,并一次将所有消息打印到文件中。
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); completMessage += new String(bytes, "UTF-8")+"\n"; ``` `new String(bytes, "UTF-8")+"\n";` 包含实际消息。 最后将所有消息打印到文件中。
writeDataToFile(completMessage);``` writeDataToFile 包含将字符串写入文件的简单java代码。谢谢您。
writeDataToFile
bbuxkriu2#
这是可能的。下面是这个的工作代码。
package com.venk.prac; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import kafka.utils.ShutdownableThread; public class FileConsumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> kafkaConsumer; private String topic; private String filePath; private BufferedWriter buffWriter; public FileConsumer(String topic, String filePath) { super("FileConsumer", false); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer"); properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<Integer, String>(properties); this.topic = topic; this.filePath = filePath; try { this.buffWriter = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void doWork() { // TODO Auto-generated method stub kafkaConsumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); try { for (ConsumerRecord<Integer, String> record : consumerRecords) buffWriter.write(record.value() + System.lineSeparator()); buffWriter.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public String name() { // TODO Auto-generated method stub return null; } @Override public boolean isInterruptible() { return false; } }
lokaqttq3#
如果您正在编写自己的使用者,则应该在同一应用程序中包含写入文件的逻辑。使用预先打包的console consumer,您可以通过管道将其传输到一个文件。例如: kafka-console-consumer > file.txt 另一个(无代码)选项是尝试streamsetsdatacollector,这是一个开放源码的apache许可工具,它也有一个拖放ui。它包括Kafka和各种数据格式的内置连接器。
kafka-console-consumer > file.txt
3条答案
按热度按时间2izufjch1#
谢谢各位,
我能做到。一旦用户端接收到数据,就必须编写一个普通的java代码。
下面是ode中将消息打印到控制台的行。
您可以将所有消息存储到字符串中,并一次将所有消息打印到文件中。
writeDataToFile(completMessage);
```
writeDataToFile
包含将字符串写入文件的简单java代码。谢谢您。
bbuxkriu2#
这是可能的。下面是这个的工作代码。
lokaqttq3#
如果您正在编写自己的使用者,则应该在同一应用程序中包含写入文件的逻辑。使用预先打包的console consumer,您可以通过管道将其传输到一个文件。例如:
kafka-console-consumer > file.txt
另一个(无代码)选项是尝试streamsetsdatacollector,这是一个开放源码的apache许可工具,它也有一个拖放ui。它包括Kafka和各种数据格式的内置连接器。