我有这样一个数据流:
DataStream[myTuple(topic, value)]
我想在相关主题中发送一个特定值。
所以我试着这样做:
new FlinkKafkaProducer010[myTuple](
"default_topic",
new KeyedSerializationSchema[myTuple](){
override def getTargetTopic(element: myTuple): String = element.topic
override def serializeKey(element: myTuple): Array[Byte] = null
override def serializeValue(element: myTuple): Array[Byte] = new SimpleStringSchema().serialize(element.value)
},
properties)
但它不起作用,我有一个警告:
WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - Overwriting the 'key.serializer' is not recommended
WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - Overwriting the 'value.serializer' is not recommended
我不知道怎么做,换个方法。谢谢你的帮助。
1条答案
按热度按时间sirbozc51#
你可能会
key.serializer
以及value.serializer
在你的房子里。不应该这样做,因为这样会覆盖序列化程序(ByteArraySerializer
s) Flink内部使用。删除这些属性,你的代码就可以运行了。