为什么用spark rdd的Kafka主题写作不管用

3phpmpom  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(151)

为什么这封信不是rdd写给Kafka的主题呢。我不知道我做错了什么。所以基本上我想接收一条消息,做一些过滤,然后把它写到另一个Kafka主题。我应该如何用spark rdd初始化Kafka制作人?
ps:我想用kafka自己的api来实现这一点,而不是像cloudera的spark kafka writer这样的其他api。

JavaPairInputDStream<byte[], byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc, byte[].class,
            byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaParams, topics);
    directKafkaStream.foreachRDD(rdd -> {
        rdd.foreach(record -> {
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(USER_SCHEMA);
            DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
            Decoder decoderRec1 = DecoderFactory.get().binaryDecoder(record._1, null);
            Decoder decoderRec2 = DecoderFactory.get().binaryDecoder(record._2, null);
            GenericRecord messageValue = null;
            GenericRecord messageKey = null;
            try {
                messageValue = reader.read(null, decoderRec2);
                messageKey = reader.read(null, decoderRec1);
                ProducerRecord<String, String> record1 = new ProducerRecord<>("outputTopic", "myKey",
                        "Key: " + messageKey + " Value" + messageValue.toString());
                Future<RecordMetadata> sent = producer.send(record1);
                sent.get();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("Received: " + messageValue + " and key: " + messageKey);
            producer.close();
        });
    });
    ssc.start();
    ssc.awaitTermination();

这是一个有效的例子

JavaDStream<InputMessage> inputMessageStream = directKafkaStream.map(avroRecord -> InputMessageTranslator.decodeAvro(avroRecord._2));
    inputMessageStream.foreachRDD(rdd -> {
        rdd.foreach(message -> {
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
            ProducerRecord<String, String> record = new ProducerRecord<>(startProps.getProperty(InputPropertyKey.OUTPUT_TOPIC.toString()),
                    "myKey", "" + message.getSessionStartTime());
            producer.send(record);
            producer.close();
        });
    });

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题