如何使用ci服务器上的kafka作为测试服务?

3df52oht  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(509)

有人在ci环境中成功地将kafka设置为服务吗?我正在尝试使用 lensesio/fast-data-dev docker图像。也尝试过合流图片,见下。。。
我有以下运行测试的ci作业。设置 ADV_HOST 在同一docker网络上通过gitlab ci服务访问kafka容器。尝试联系Kafka容器时作业挂起。
使用lensesio/fast data dev的gitlab ci作业

  1. tests:
  2. stage: test
  3. variables:
  4. ADV_HOST: kafka
  5. DISABLE: azure-documentdb,blockchain,bloomberg,cassandra,coap,druid,elastic,elastic5,ftp,hazelcast,hbase,influxdb,jms,kudu,mongodb,mqtt,redis,rethink,voltdb,yahoo,hdfs,jdbc,elasticsearch,s3,twitter
  6. CONNECT_HEAP: 512m
  7. MINIO_BUCKET: images
  8. SAMPLEDATA: 0
  9. REST_PORT: 8082
  10. FORWARDLOGS: 0
  11. RUNTESTS: 0
  12. DISABLE_JMX: 1
  13. WEB_PORT: 0
  14. DISABLE: hive-1.1
  15. ##
  16. # Services
  17. # - Kafka
  18. # - Mosquitto (MQTT)
  19. # - Minio (S3)
  20. ##
  21. services:
  22. - name: lensesio/fast-data-dev:2.5.1-L0
  23. alias: kafka
  24. - name: dcs3spp/minio:version-1.0.2
  25. alias: minio
  26. - name: eclipse-mosquitto:1.6.9
  27. alias: mqtt
  28. script:
  29. - >
  30. dotnet test --no-restore
  31. --logger:trx
  32. --settings:Tests/coverlet.runsettings
  33. --collect:"XPlat Code Coverage"
  34. WebApp.sln

ci作业挂起试图联系Kafka容器

  1. info: WebApp.Kafka.Admin.KafkaAdminService[0]
  2. Admin service trying to create Kafka Topic...
  3. info: WebApp.Kafka.Admin.KafkaAdminService[0]
  4. Topic::eventbus, ReplicationCount::1, PartitionCount::3
  5. info: WebApp.Kafka.Admin.KafkaAdminService[0]
  6. Bootstrap Servers::kafka:9092

融合的Kafka Docker 形象/

  1. tests:
  2. stage: test
  3. variables:
  4. ZOOKEEPER_CLIENT_PORT: 2181
  5. ZOOKEEPER_TICK_TIME: 2000
  6. KAFKA_BROKER_ID: 1
  7. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  8. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
  9. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  10. KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  11. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  12. ##
  13. # Services
  14. # - Kafka
  15. # - Mosquitto (MQTT)
  16. # - Minio (S3)
  17. ##
  18. services:
  19. - name: confluentinc/cp-zookeeper:5.1.0
  20. alias: zookeeper
  21. - name: confluentinc/cp-kafka:5.1.0
  22. alias: kafka
  23. - name: dcs3spp/minio:version-1.0.2
  24. alias: minio
  25. - name: eclipse-mosquitto:1.6.9
  26. alias: mqtt
  27. script:
  28. - >
  29. dotnet test --no-restore
  30. --logger:trx
  31. --settings:Tests/coverlet.runsettings
  32. --collect:"XPlat Code Coverage"
  33. WebApp.sln

合流图像错误日志

  1. 2020-11-15T12:15:44.558992574Z [2020-11-15 12:15:44,536] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
  2. 2020-11-15T12:15:44.558996094Z [2020-11-15 12:15:44,536] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
  3. 2020-11-15T12:15:44.558999139Z [2020-11-15 12:15:44,542] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
  4. 2020-11-15T12:15:44.559002735Z [2020-11-15 12:15:44,542] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
  5. 2020-11-15T12:15:44.559006873Z [2020-11-15 12:15:44,542] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
  6. 2020-11-15T12:15:44.559009945Z [2020-11-15 12:15:44,542] INFO Server environment:os.version=4.19.78-coreos (org.apache.zookeeper.server.ZooKeeperServer)
  7. 2020-11-15T12:15:44.559012987Z [2020-11-15 12:15:44,542] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
  8. 2020-11-15T12:15:44.559015990Z [2020-11-15 12:15:44,542] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
  9. 2020-11-15T12:15:44.559019219Z [2020-11-15 12:15:44,544] INFO Server environment:user.dir=/ (org.apache.zookeeper.server.ZooKeeperServer)
  10. 2020-11-15T12:15:44.646950748Z [2020-11-15 12:15:44,642] INFO tickTime set to 2000 (org.apache.zookeeper.server.ZooKeeperServer)
  11. 2020-11-15T12:15:44.646965707Z [2020-11-15 12:15:44,642] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
  12. 2020-11-15T12:15:44.646969289Z [2020-11-15 12:15:44,642] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
  13. 2020-11-15T12:15:44.748601072Z [2020-11-15 12:15:44,725] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
  14. 2020-11-15T12:15:44.780680437Z [2020-11-15 12:15:44,754] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
  15. *********
  16. Pulling docker image mcr.microsoft.com/dotnet/core/sdk:3.1-alpine ...
  17. Using docker image sha256:b0c526e8732fdcf06a1cc277f04523b4d3f10a6554d2b9df855e683524ee7ddf for mcr.microsoft.com/dotnet/core/sdk:3.1-alpine with digest mcr.microsoft.com/dotnet/core/sdk@sha256:3982ac41d8777b78ad7a2efe4c9674338975ebf9a25eeceb943348e45edf91b1 ...
  18. Preparing environment
  19. 00:01
  20. ERROR: Job failed (system failure): prepare environment: Error response from daemon: Cannot link to a non running container: /runner-z3wu8uu--project-20752619-concurrent-0-5f56b4323d8253b2-confluentinc__cp-kafka-1 AS /runner-z3wu8uu--project-20752619-concurrent-0-5f56b4323d8253b2-predefined-0/kafka (docker.go:817:0s).

是由kafka容器启动尝试连接到我在“kafka\u disposed\u listeners”变量中设置的kafka容器引起的错误。例如,它无法连接到kafka:29092 because 容器尚未启动?

vngu2lb8

vngu2lb81#

在阅读了这个aspnetcore问题之后,发现问题出在我的 IHostedService 向Kafka提出请求的实现。
这个 StartAsync 方法正在执行任务,一直运行到请求完成。按照设计,这种方法意味着一发不可收拾,即启动任务,然后继续。更新了我的 KafkaAdmin 服务成为 BackgroundService ,覆盖 ExecuteAsync 方法,如下所示。随后,测试不再阻塞。

  1. using System;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. using Confluent.Kafka;
  5. using Confluent.Kafka.Admin;
  6. using Microsoft.Extensions.Hosting;
  7. using Microsoft.Extensions.Logging;
  8. using Microsoft.Extensions.Options;
  9. using KafkaAdmin.Kafka.Config;
  10. namespace KafkaAdmin.Kafka
  11. {
  12. public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
  13. /// <summary>Background Service to make a request from Kafka to create a topic</summary>
  14. public class KafkaAdminService : BackgroundService, IDisposable
  15. {
  16. private KafkaAdminFactory _Factory { get; set; }
  17. private ILogger<KafkaAdminService> _Logger { get; set; }
  18. private KafkaConfig _Config { get; set; }
  19. /// <summary>
  20. /// Retrieve KafkaConfig from appsettings
  21. /// </summary>
  22. /// <param name="config">Config POCO from appsettings file</param>
  23. /// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
  24. /// <param name="logger">Logger instance</param>
  25. public KafkaAdminService(
  26. IOptions<KafkaConfig> config,
  27. KafkaAdminFactory clientFactory,
  28. ILogger<KafkaAdminService> logger)
  29. {
  30. if (clientFactory == null)
  31. throw new ArgumentNullException(nameof(clientFactory));
  32. if (config == null)
  33. throw new ArgumentNullException(nameof(config));
  34. _Config = config.Value ?? throw new ArgumentNullException(nameof(config));
  35. _Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
  36. _Logger = logger ?? throw new ArgumentNullException(nameof(logger));
  37. }
  38. /// <summary>
  39. /// Create a Kafka topic if it does not already exist
  40. /// </summary>
  41. /// <param name="token">Cancellation token required by IHostedService</param>
  42. /// <exception name="CreateTopicsException">
  43. /// Thrown for exceptions encountered except duplicate topic
  44. /// </exception>
  45. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  46. {
  47. using (var client = _Factory(_Config))
  48. {
  49. try
  50. {
  51. _Logger.LogInformation("Admin service trying to create Kafka Topic...");
  52. _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
  53. _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
  54. await client.CreateTopicsAsync(new TopicSpecification[] {
  55. new TopicSpecification {
  56. Name = _Config.Topic.Name,
  57. NumPartitions = _Config.Topic.PartitionCount,
  58. ReplicationFactor = _Config.Topic.ReplicationCount
  59. }
  60. }, null);
  61. _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
  62. }
  63. catch (CreateTopicsException e)
  64. {
  65. if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
  66. {
  67. _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
  68. throw e;
  69. }
  70. else
  71. {
  72. _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
  73. }
  74. }
  75. }
  76. _Logger.LogInformation("Kafka Consumer thread started");
  77. await Task.CompletedTask;
  78. }
  79. /// <summary>
  80. /// Call base class dispose
  81. /// </summary>
  82. public override void Dispose()
  83. {
  84. base.Dispose();
  85. }
  86. }
  87. }

还不明白为什么livewebapp会成功启动。为什么这只是testserver的一个问题?
我使用了confluent/cpKafkadocker图像。exmaple docker compose文件是:

  1. ---
  2. version: "3.8"
  3. services:
  4. zookeeper:
  5. image: confluentinc/cp-zookeeper:6.0.0
  6. hostname: zookeeper
  7. container_name: zookeeper
  8. ports:
  9. - "2181:2181"
  10. networks:
  11. - camnet
  12. environment:
  13. ZOOKEEPER_CLIENT_PORT: 2181
  14. ZOOKEEPER_TICK_TIME: 2000
  15. ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN
  16. kafka:
  17. image: confluentinc/cp-kafka:6.0.0
  18. hostname: kafka
  19. container_name: kafka
  20. depends_on:
  21. - zookeeper
  22. networks:
  23. - camnet
  24. environment:
  25. KAFKA_BROKER_ID: 1
  26. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  27. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
  28. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
  29. KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  30. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  31. KAFKA_NUM_PARTITIONS: 3
  32. KAFKA_HEAP_OPTS: -Xmx512M -Xms512M
  33. KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
  34. KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"
  35. netclient-test:
  36. build:
  37. context: ../
  38. dockerfile: docker/Dockerfile
  39. container_name: netclient-test
  40. image: dcs3spp/netclient
  41. networks:
  42. - camnet
  43. depends_on:
  44. - kafka
  45. - netclient-run
  46. entrypoint: []
  47. command:
  48. - bash
  49. - -c
  50. - |-
  51. echo 'Giving Kafka a bit of time to start up…'
  52. while ! nc -z kafka 9092;
  53. do
  54. sleep 1;
  55. done;
  56. echo 'Giving netclient-run a bit of time to start up…'
  57. while ! nc -z netclient-run 80;
  58. do
  59. sleep 1;
  60. done;
  61. echo .NET Client test container ready. Running test that uses WebApplicationFactory TestServer to start WebApp with KafkaAdmin background service
  62. echo This runs successfully in a local development environment on MacOS and Ubuntu Linux 16.04.
  63. echo This fails when running on a GitLab CI Server. It can be seen that the test server bootstraps the WebApp.....
  64. echo The KafkaAdmin background service blocks when requesting topic creation from the kafka service
  65. dotnet test --runtime linux-musl-x64 -c Release --no-restore --nologo tests/KafkaAdmin.Kafka.IntegrationTests/
  66. netclient-run:
  67. build:
  68. context: ../
  69. dockerfile: docker/Dockerfile
  70. container_name: netclient-run
  71. image: dcs3spp/netclient
  72. networks:
  73. - camnet
  74. depends_on:
  75. - kafka
  76. entrypoint: []
  77. command:
  78. - bash
  79. - -c
  80. - |-
  81. echo 'Giving Kafka a bit of time to start up…'
  82. while ! nc -z kafka 9092;
  83. do
  84. sleep 1;
  85. done;
  86. echo .NET Run Web App Ready. Starting WebApp that contains KafkaAdmin background service.
  87. dotnet run --runtime linux-musl-x64 -c Release --no-restore --project src/KafkaAdmin.WebApp/
  88. networks:
  89. camnet:

我还创建了一个存储库来演示问题和修复。主分支包含显示问题的源代码。feat/fix分支包含包含修复的源代码。
希望这能帮助其他遇到类似问题的人!

展开查看全部

相关问题