product在配置了iasyncserializer值序列化程序的情况下调用,但在使用avro序列化程序时需要iserializer

f0brbegy  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(427)

我正在使用kafka集群并使用事务生产者进行原子流(读-写过程)。

// Init Transactions
                _transactionalProducer.InitTransactions(DefaultTimeout);

                // Begin the transaction
                _transactionalProducer.BeginTransaction();

                // produce message to one or many topics
                var topic = Topics.MyTopic;
                _transactionalProducer.Produce(topic, consumeResult.Message);

我使用avroserializer,因为我用schema发布消息。
PRODUCT引发异常:

"System.InvalidOperationException: Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.\r\n   at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler)"

我看到的所有事务生产者示例都使用produce方法而不是produceasync,所以我不确定是否可以简单地切换到produceasync并假设事务生产者将正常工作。如果我错了,请纠正我或帮助我查找文档。
否则,我找不到从iserializer继承的非异步avroserializer。

public class AvroSerializer<T> : IAsyncSerializer<T>
vs3odd8k

vs3odd8k1#

我不知道有 AsSyncOverAsync 创建序列化程序时可以使用的方法。这是因为kafka使用者仍然是同步的而不是异步的。
例如:

new AvroSerializer<TValue>(schemaRegistryClient, serializerConfig).AsSyncOverAsync();

下面是该方法的综合文档。

//
        // Summary:
        //     Create a sync serializer by wrapping an async one. For more information on the
        //     potential pitfalls in doing this, refer to Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.
        public static ISerializer<T> AsSyncOverAsync<T>(this IAsyncSerializer<T> asyncSerializer);

相关问题