bounty将在6天后过期。回答此问题可获得+50声望奖励。user4923462希望引起更多人关注此问题。
Kafka在一个主题上只向一个分区发送消息。我在FlinkKafkaProducer09中使用了KeyedSerializationSchema,并从事件流中传递了一个属性,该属性将用于执行哈希分区选择(因为我希望每次来自一种属性类型的所有事件都进入一个特定的分区)。当我发布属于50种不同属性类型的消息时,我看到它们都进入了同一个分区,我期待着某种形式的负载平衡,它基于Kafka的基于属性的分区选择。
DataStream<JsonObject> myEvents = ....;
FlinkKafkaProducer09<JsonObject> myProducer = new FlinkKafkaProducer09<>(myTopic, new myImplementationOfKeyedSerializationSchema("attributeNameToUseForPartition"), kafkaproperties);
myEvents.addSink(myProducer).setParallelism(1).name("mySink");
....
class myImplementationOfKeyedSerializationSchema implements KeyedSerializationSchema<JsonObject>
{
public myImplementationOfKeyedSerializationSchema (String messageKey) {
this.messageKey = messageKey;
}
@Override
public byte[] serializeKey(JsonObject event) {
return event.get(messageKey).toString().getBytes();
}
@Override
public byte[] serializeValue(JsonObject event) {
return event.toString().getBytes();
}
@Override
public String getTargetTopic(JsonObject event) {
return null;
}
}
我无法弄清楚为什么分区选择没有发生。1.1.4
1条答案
按热度按时间nwlls2ji1#
我还没有深入研究代码来了解Flink是如何配置Kafka生产者的,因为你没有提供一个显式的分区器,但是在使用Flink时依赖默认的Kafka行为通常是有问题的,因为Flink覆盖了很多Kafka行为。
至少对于较新的
KafkaSink
,您可以指定KafkaRecordSerializationSchema
使用的分区程序,您可以通过KafkaRecordSerializationSchemaBuilder.setPartitioner()
方法设置该分区程序。