为什么我的producerbuilder类在.net核心控制台中的消息没有被kafka consumer shell使用?

yh2wf1be  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(601)

我在ubuntu上安装了kafka,并用shell文件测试了一个简单的场景:
使用默认配置启动zookeeper
使用默认配置启动节点或代理
创建了一个主题,给它一个名称,单个分区和复制因子为1,并将其与zookeeper默认地址相关联
一个生产者和一个消费者: bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname 然后: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning 一切正常,一切正常。现在,我想关闭producer并在asp.net核心应用程序中为producer编写代码。

using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;

    namespace KafkaTraining
    {
        class Program
        {
            static void Main(string[] args)
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = "localhost:9092"
                };

                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
                }
                Console.ReadLine();
            }
        }
    }

灵感来源://https://docs.confluent.io/current/clients/dotnet.html#
因此,上面的c代码应该与shell命令等价,对吧,加上我用c编写的消息,应该由使用者使用(由于shell命令,使用者仍然在终端窗口中打开)。
如果我之前发布的shell命令有效,即 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname 只是关于localhost:9092 (服务地址)和主题名,为什么c#程序不能成功地替代它,它应该产生消息“a log message”,终端应该使用它,但是没有。我怎么调试这个呢?
另外,我在linux ubuntu上安装了kafka,地址如下:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz而在我的asp.net core 3.1控制台应用程序中,我已经安装了您可以看到的版本1.5.0,不确定这是否起作用,但没有关于如何开始调试的线索。。。任何指示都可以。

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <StartupObject>KafkaDemo.Program</StartupObject>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="1.5.0" />
  </ItemGroup>

</Project>
ftf50wuq

ftf50wuq1#

没有生成您的邮件,因为您在传递邮件之前处理了生产者。 Produce 方法以异步方式发送消息,并且不等待响应—它会立即返回。为了确保它被发送,您可以使用 Flush() -在所有飞行中的信息都被传送之前,或者你可以使用 await ProduceAsync() .
试试这个:

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
    producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
    producer.Flush();
}

相关问题