CloudEventEventValidator是否将自定义扩展(Kafka标头)封装?

5gfr0r5j  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(179)

我使用io.cloudevents.core.v1.CloudEventBuilder builder构建了cloudevent示例,并使用builder.withExtension("whatever", "whatever");方法对其进行了自定义扩展。Cloudevent已构建,已发送。我可以看到,在Kafka中,有相应的消息,它的头“whatever”具有值“whatever”。到目前为止还不错。
但是如果我将给定的消息封装到io.cloudevents.CloudEvent示例中,头就不在“那里”。我希望它在io.cloudevents.CloudEventExtensions#getExtensionNames resp。io.cloudevents.CloudEventExtensions#getExtension但它不在那里或其他任何地方。
深入研究io.cloudevents.kafka.CloudEventDeserializer,我们可以深入研究io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl#read,如下所示。
调用这个方法,我们会在这里找到我们的头,key==value==“whatever”。所以value不为null,我们继续,它不是“content-type”头,所以我们继续到else分支,它没有“ce_”前缀(isCloudEventsHeader),我们完成了。已忽略自定义扩展。
这是预期会发生的吗?我是不是忽略了什么?我希望,如果我能够使用自定义扩展创建Cloudevent,我也想阅读它。是否有一些特殊的配置或我忘记启用的东西?

@Override
    public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
        CloudEventWriter<V> visitor = writerFactory.create(this.version);

        // Grab from headers the attributes and extensions
        // This implementation avoids to use visitAttributes and visitExtensions
        // in order to complete the visit in one loop
        this.forEachHeader((key, value) -> {
            if (value == null) {
                return;
            }
            if (isContentTypeHeader(key)) {
                visitor.withContextAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
            } else if (isCloudEventsHeader(key)) {
                String name = toCloudEventsKey(key);
                if (name.equals(CloudEventV1.SPECVERSION)) {
                    return;
                }
                visitor.withContextAttribute(name, toCloudEventsValue(value));
            }
        });
woobm2wo

woobm2wo1#

TLDR:即使是自定义属性也必须遵循与ce_type等“官方”属性相同的命名约定。自定义属性“whatever”不正确,它必须是“ce_whatever”(或cloudevens_whatever,取决于您使用它的位置)。
在我们的项目中,我们有多个生产者,生产这个特定记录的是kafka-connect,它配置错误。我误解了规范,即使是自定义扩展也必须有“ce_”前缀。我认为这条规则只适用于特定于云的已知头文件。所以我修复了kafka-connect配置,果然一切正常。

相关问题