为什么这封信不是rdd写给Kafka的主题呢。我不知道我做错了什么。所以基本上我想接收一条消息,做一些过滤,然后把它写到另一个Kafka主题。我应该如何用spark rdd初始化Kafka制作人?
ps:我想用kafka自己的api来实现这一点,而不是像cloudera的spark kafka writer这样的其他api。
JavaPairInputDStream<byte[], byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
rdd.foreach(record -> {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
Decoder decoderRec1 = DecoderFactory.get().binaryDecoder(record._1, null);
Decoder decoderRec2 = DecoderFactory.get().binaryDecoder(record._2, null);
GenericRecord messageValue = null;
GenericRecord messageKey = null;
try {
messageValue = reader.read(null, decoderRec2);
messageKey = reader.read(null, decoderRec1);
ProducerRecord<String, String> record1 = new ProducerRecord<>("outputTopic", "myKey",
"Key: " + messageKey + " Value" + messageValue.toString());
Future<RecordMetadata> sent = producer.send(record1);
sent.get();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Received: " + messageValue + " and key: " + messageKey);
producer.close();
});
});
ssc.start();
ssc.awaitTermination();
这是一个有效的例子
JavaDStream<InputMessage> inputMessageStream = directKafkaStream.map(avroRecord -> InputMessageTranslator.decodeAvro(avroRecord._2));
inputMessageStream.foreachRDD(rdd -> {
rdd.foreach(message -> {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>(startProps.getProperty(InputPropertyKey.OUTPUT_TOPIC.toString()),
"myKey", "" + message.getSessionStartTime());
producer.send(record);
producer.close();
});
});
暂无答案!
目前还没有任何答案,快来回答吧!