kafka的tpl性能

cgfeq70w  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(222)

halo,我无法通过tpl数据流获得任何改进的性能,我想知道我是否用错了它。
下面的应用程序执行以下操作:
从Kafka主题中提取消息
将此消息解析为 Foo 对象 ParseData() 序列化此 Foo 转换为json
然后将json发布到一个新的kafka主题。
一些单线程统计信息: ParseData 可以将字符串解析为 Foo 100 msg/sec(单线程测试) SerializeMessage 可以做200 Foos /sec(单线程测试)
使用kafka消息(跳过所有解析/序列化)可以处理超过2000个msg/秒
基于此,我希望利用第三方物流来提高吞吐量。我的最大吞吐量应该接近Kafka限制2000毫秒/秒。
但是,我没有看到吞吐量方面的任何改进,我正在一台有12个物理内核(24wht)的机器上运行应用程序。当我打印出每个块的队列大小时 transformBlock 总是在1000左右,但是其他的都在10以下,这让我相信transformblock没有利用多核系统。
我是否设置了tpl数据流以正确利用并行性?

app = new App();
await app.Start(new[]{"consume-topic"}, cancelSource);

// App class
async Task Start(IEnumerable<string> topics, CancellationTokenSource cancelSource) {
    transformBlock = new TransformBlock<string, Foo>(TransformKafkaMessage,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 8,
            BoundedCapacity = 1000,
            SingleProducerConstrained = true,
        });
    serializeBlock = new TransformBlock<Foo, string>(SerializeMessage,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
            BoundedCapacity = 1000,
            SingleProducerConstrained = true,
        });
   publishBlock = new ActionBlock<JsonMessage>(PublishJson,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 1000,
            SingleProducerConstrained = true
        }); 

    // Setup the pipeline
    transformBlock.LinkTo(serializeBlock);
    serializeBlock.LinkTo(publishBlock);

    // Start Kafka Listener loop
    consumer.Subscribe(topics);
    while(true) {
        var result = consumer.Consume(cancelSource.Token);
        await ProcessMessage<Ignore, string>(result);
    }
}

// send the content of the kafka message to transform block
async Task ProcessMessage<TKey, TValue>(ConsumeResult<TKey, string> msg) {
    var result = await transformBlock.SendAsync(msg.Value);
}

// Convert the raw string data into an object
Foo TransformKafkaMessage(string data) {
    // Note this ParseData() function can process about 100 items per sec
    // in local single threaded testing
    Foo foo = ParseData(data);
    return foo;
}

// Serialize the new Foo into JSON
string SerializeMessage(Foo foo) {
    // The serializer can process about 200 msgs/sec (single threaded test)
    var json = foo.Serialize();
    return json;
}

// publish new message back to Kafka
void PublishJson(string json) {
    // Create a Confluent.Kafka Message
    var kafkaMessage = new Message<Null, string> {
        Value = json
    };
    producer.Produce("produce-topic", kafkaMessage);
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题