halo,我无法通过tpl数据流获得任何改进的性能,我想知道我是否用错了它。
下面的应用程序执行以下操作:
从Kafka主题中提取消息
将此消息解析为 Foo
对象 ParseData()
序列化此 Foo
转换为json
然后将json发布到一个新的kafka主题。
一些单线程统计信息: ParseData
可以将字符串解析为 Foo
100 msg/sec(单线程测试) SerializeMessage
可以做200 Foos
/sec(单线程测试)
使用kafka消息(跳过所有解析/序列化)可以处理超过2000个msg/秒
基于此,我希望利用第三方物流来提高吞吐量。我的最大吞吐量应该接近Kafka限制2000毫秒/秒。
但是,我没有看到吞吐量方面的任何改进,我正在一台有12个物理内核(24wht)的机器上运行应用程序。当我打印出每个块的队列大小时 transformBlock
总是在1000左右,但是其他的都在10以下,这让我相信transformblock没有利用多核系统。
我是否设置了tpl数据流以正确利用并行性?
app = new App();
await app.Start(new[]{"consume-topic"}, cancelSource);
// App class
async Task Start(IEnumerable<string> topics, CancellationTokenSource cancelSource) {
transformBlock = new TransformBlock<string, Foo>(TransformKafkaMessage,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 8,
BoundedCapacity = 1000,
SingleProducerConstrained = true,
});
serializeBlock = new TransformBlock<Foo, string>(SerializeMessage,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 1000,
SingleProducerConstrained = true,
});
publishBlock = new ActionBlock<JsonMessage>(PublishJson,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1000,
SingleProducerConstrained = true
});
// Setup the pipeline
transformBlock.LinkTo(serializeBlock);
serializeBlock.LinkTo(publishBlock);
// Start Kafka Listener loop
consumer.Subscribe(topics);
while(true) {
var result = consumer.Consume(cancelSource.Token);
await ProcessMessage<Ignore, string>(result);
}
}
// send the content of the kafka message to transform block
async Task ProcessMessage<TKey, TValue>(ConsumeResult<TKey, string> msg) {
var result = await transformBlock.SendAsync(msg.Value);
}
// Convert the raw string data into an object
Foo TransformKafkaMessage(string data) {
// Note this ParseData() function can process about 100 items per sec
// in local single threaded testing
Foo foo = ParseData(data);
return foo;
}
// Serialize the new Foo into JSON
string SerializeMessage(Foo foo) {
// The serializer can process about 200 msgs/sec (single threaded test)
var json = foo.Serialize();
return json;
}
// publish new message back to Kafka
void PublishJson(string json) {
// Create a Confluent.Kafka Message
var kafkaMessage = new Message<Null, string> {
Value = json
};
producer.Produce("produce-topic", kafkaMessage);
}
暂无答案!
目前还没有任何答案,快来回答吧!