在dotnet TestContainers中同时使用Kafka和Schema Registry容器

aamkag61  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(109)

我不能在TestContainers中使用Kafka容器沿着模式注册表容器。以下是我的设置:

private readonly KafkaContainer _kafkaContainer = new KafkaBuilder()
        .WithImage("confluentinc/cp-kafka:7.4.1")
        .WithHostname("broker")
        .WithName("broker")
        .WithNetworkAliases("broker")
        .WithNetwork(NetworkName)
        .WithPortBinding(9092, 9092)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092))
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092" }
            }))
        .WithCleanUp(true)
        .Build();
    
    private readonly IContainer _schemaRegistryContainer = new ContainerBuilder()
        .WithImage("confluentinc/cp-schema-registry:7.4.1")
        .WithHostname("schema-registry")
        .WithName("schema-registry")
        .WithCleanUp(true)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "SCHEMA_REGISTRY_HOST_NAME", "schema-registry" },
                { "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:29092" },
                { "SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081" }
            }))
        .WithPortBinding(8081, 8081)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8081))
        .WithNetwork(NetworkName)
        .Build();

字符串
我在schema registry容器的日志中看到的内容:

2023-12-21 13:27:21 [2023-12-21 12:27:21,315] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (broker/172.27.0.2:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:22 [2023-12-21 12:27:22,364] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:22 [2023-12-21 12:27:22,364] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (broker/172.27.0.2:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:23 [2023-12-21 12:27:23,604] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:23 [2023-12-21 12:27:23,604] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (broker/172.27.0.2:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:24 [2023-12-21 12:27:24,741] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:24 [2023-12-21 12:27:24,742] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (broker/172.27.0.2:29092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-12-21 13:27:25 [2023-12-21 12:27:25,772] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)


我应该做些什么改变才能让他们成功地一起工作?
谢谢你,

bzzcjhmw

bzzcjhmw1#

所以我不是100%理解为什么它必须是这样,为什么其他配置不起作用。但这是成功启动两个容器的设置:

private readonly static INetwork network = new NetworkBuilder()
        .WithName(Guid.NewGuid().ToString("D"))
        .Build();
  
    private static readonly KafkaContainer _kafkaContainer = new KafkaBuilder()
        .WithImage("confluentinc/cp-kafka:7.4.1")
        .WithNetworkAliases("kafka")
        .WithNetwork(network)
        .WithEnvironment(new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
        {
            {"KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL", "http://schemaregistry:8085"}
        }))
        .WithCleanUp(true)
        .Build();
    
    private static readonly IContainer _schemaRegistryContainer = new ContainerBuilder()
        .WithImage("confluentinc/cp-schema-registry:7.4.1")
        .WithCleanUp(true)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9093" },
                { "SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT"},
                { "SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085" },
                { "SCHEMA_REGISTRY_HOST_NAME", "schemaregistry" }
            }))
        .WithPortBinding(8085, true)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("Server started, listening for requests"))
        .WithNetwork(network)
        .WithNetworkAliases("schemaregistry")
        .DependsOn(_kafkaContainer)
        .Build();

字符串
注意,KafkaBuilder已经公开了端口,并配置了等待和其他类似的细节。所以我们只添加网络信息,并告诉它需要一个模式注册表。
模式注册表容器,我们没有模块(但可能实际上应该,或者像Kafka模块的扩展一样,将模式注册表容器添加到设置中),需要更多的配置。
我还更改了端口,以反映我在注解中共享的Java示例。

相关问题