我正在尝试读取100k文件并将其发送到Kafka主题。这是我的Kafka代码,它将数据发送到Kafka控制台使用者。当我发送数据时,我接收的数据是这样的
java.util.stream.ReferencePipeline$Head@e9e54c2
以下是我发送的单记录数据示例:
173|172686|548247079|837113012|0x548247079f|7|173|172686a|0|173|2059 22143|0|173|1|173|172686|||0|||7|0||7|||7|172686|allowAllServices|?20161231:22143|548247079||0|173||172686|5:2266490827:DCCInter;20160905152146;2784
任何关于获取我在上面展示的数据的建议…谢谢
代码:
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@SuppressWarnings("unused")
public class HundredKRecords {
private static String sCurrentLine;
public static void main(String args[]) throws InterruptedException, ExecutionException{
String fileName = "/Users/sreeeedupuganti/Downloads/octfwriter.txt";
//read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
stream.forEach(System.out::println);
kafka(stream.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void kafka(String stream) {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class","kafka.producer.DefaultPartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>("test",stream));
producer.close();
}
}
1条答案
按热度按时间mi7gmzs61#
问题是一致的
kafka(stream.toString());
java流类不重写方法toString
. 默认情况下,它返回getClass().getName() + '@' + Integer.toHexString(hashCode())
. 那正是你收到的。为了在kafka中接收整个文件,您必须手动将其转换为一个字符串(字节数组)。
请注意,Kafka对消息大小有限制。