我使用io.cloudevents.core.v1.CloudEventBuilder builder
构建了cloudevent示例,并使用builder.withExtension("whatever", "whatever");
方法对其进行了自定义扩展。Cloudevent已构建,已发送。我可以看到,在Kafka中,有相应的消息,它的头“whatever”具有值“whatever”。到目前为止还不错。
但是如果我将给定的消息封装到io.cloudevents.CloudEvent
示例中,头就不在“那里”。我希望它在io.cloudevents.CloudEventExtensions#getExtensionNames
resp。io.cloudevents.CloudEventExtensions#getExtension
但它不在那里或其他任何地方。
深入研究io.cloudevents.kafka.CloudEventDeserializer
,我们可以深入研究io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl#read
,如下所示。
调用这个方法,我们会在这里找到我们的头,key==value==“whatever”。所以value不为null,我们继续,它不是“content-type”头,所以我们继续到else分支,它没有“ce_”前缀(isCloudEventsHeader),我们完成了。已忽略自定义扩展。
这是预期会发生的吗?我是不是忽略了什么?我希望,如果我能够使用自定义扩展创建Cloudevent,我也想阅读它。是否有一些特殊的配置或我忘记启用的东西?
@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.version);
// Grab from headers the attributes and extensions
// This implementation avoids to use visitAttributes and visitExtensions
// in order to complete the visit in one loop
this.forEachHeader((key, value) -> {
if (value == null) {
return;
}
if (isContentTypeHeader(key)) {
visitor.withContextAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
} else if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (name.equals(CloudEventV1.SPECVERSION)) {
return;
}
visitor.withContextAttribute(name, toCloudEventsValue(value));
}
});
1条答案
按热度按时间woobm2wo1#
TLDR:即使是自定义属性也必须遵循与ce_type等“官方”属性相同的命名约定。自定义属性“whatever”不正确,它必须是“ce_whatever”(或cloudevens_whatever,取决于您使用它的位置)。
在我们的项目中,我们有多个生产者,生产这个特定记录的是kafka-connect,它配置错误。我误解了规范,即使是自定义扩展也必须有“ce_”前缀。我认为这条规则只适用于特定于云的已知头文件。所以我修复了kafka-connect配置,果然一切正常。