把钥匙交给flink kafka制片人

lfapxunr  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(472)

我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为找了一段时间后,找不到太多相关的。我目前正在读Kafka主题的流,然后在执行一些计算后,我想写这个新的Kafka单独的主题。但我面临的问题是,我不能发送Kafka主题的关键。我使用的是FlinkKafka连接器,它给我提供了flinkkafkaconsumer和flinkkafkaproducer。更详细的看下面是我的代码,我可以改变我的代码,它可以工作,目前在Kafka,我正在生产我的消息去与null键,其中作为值是我需要的:

Properties consumerProperties = new Properties();

    consumerProperties.setProperty("bootstrap.servers", serverURL);
    consumerProperties.setProperty("group.id", groupID);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
            new SimpleStringSchema(), consumerProperties);

    kafkaConsumer.setStartFromEarliest();
    DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
    final int[] tVoteCount = {0};

    DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws InterruptedException, IOException {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            Tcount = Tcount + jsonNode.get(key1).asInt();
            int nameCandidate = jsonNode.get(key2).asInt();
            System.out.println(Tcount);
            String tCountT = Integer.toString(Tcount);
            //tVoteCount = tVoteCount + voteCount;

            //waitForEventTime(timeStamp);
            return tCountT;
        }
    });
    kafkaConsumerStream.print();
    System.out.println("sdjknvksjdnv"+Tcount);
    Properties producerProperties = new Properties();
    producerProperties.setProperty("bootstrap.servers", serverURL);
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
            new SimpleStringSchema(), producerProperties);
    kafkaProducerStream.addSink(kafkaProducer);
    env.execute();

谢谢。

7fyelxc5

7fyelxc51#

如果您提供自己的kafkaserializationschema而不是使用 SimpleStringSchema ,那么您就可以完全控制所写的内容@迈克在他的回答中提供了一个如何做到这一点的例子。

kxxlusnw

kxxlusnw2#

在本博客中,您将看到一个关于如何将关键字和主题写入主题的示例:
你需要替换你创建的 new FlinkKafkaProducer 以下是:

FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
  new FlinkKafkaProducer<KafkaRecord>(
    producerTopicName, 
    ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())), 
    producerProperties
  );

相关问题