docker Kafka的集成测试在第一次读取时失败,但随后可以正常工作

2ic8powd  于 2023-08-03  发布在  Docker
关注(0)|答案(1)|浏览(114)

我有一个自定义函数forward_to_kafka(list: List),即使有问题,它也会把我的事件发送到Kafka,它的目的是传递我列表中的所有消息。我已经用JB插件大数据工具测试过了,它工作正常。
但现在我需要编写自动化集成测试,我已经有了这个:

@pytest.mark.parametrize("topic, hits", [
        ("topic1", []),
        ("topic3", ["ev1", "ev2", "ev3"]),
        ("topic2", ["event"]),
        ("topic4", ['{"event": ["arr_elem"]}', '{"event_num": 13}', '{"event": {"subev": "value"}}']),
    ])
    @pytest.mark.integration
    def test_forward_to_kafka_integration(self, topic, hits, output):
        kafka_host = 'localhost:9094'
        producer = KafkaProducer(bootstrap_servers=[kafka_host], acks='all',)
        output.forward_to_kafka(producer, topic, [message.encode() for message in hits])
        consumer = KafkaConsumer(topic, bootstrap_servers=[kafka_host],
                                 group_id=f"{topic}_grp", auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        received_messages = [message.value.decode() for message in consumer]
        print(received_messages)
        assert all([message in received_messages for message in hits])

字符串
我在docker容器中有一个Kafka,连接正常,但第二次测试总是失败。更正确的是,如果我运行空Kafka,没有保存数据,只有清晰的图像,第一个动作的push测试总是失败。
执行第2次测试的结果如下:

FAILED [ 50%][]

test_output.py:90 (TestForwardToKafka.test_forward_to_kafka_integration[topic3-hits1])
self = <tests.test_output.TestForwardToKafka object at 0x107b49520>
topic = 'topic3', hits = ['ev1', 'ev2', 'ev3']
output = <h3ra.output.Output object at 0x107c14b80>

    @pytest.mark.parametrize("topic, hits", [
        ("topic1", []),
        ("topic3", ["ev1", "ev2", "ev3"]),
        ("topic2", ["event"]),
        ("topic4", ['{"event": ["arr_elem"]}', '{"event_num": 13}', '{"event": {"subev": "value"}}']),
    ])
    @pytest.mark.integration
    def test_forward_to_kafka_integration(self, topic, hits, output):
        kafka_host = 'localhost:9094'
        producer = KafkaProducer(bootstrap_servers=[kafka_host], acks='all',)
        output.forward_to_kafka(producer, topic, [message.encode() for message in hits])
        consumer = KafkaConsumer(topic, bootstrap_servers=[kafka_host],
                                 group_id=f"{topic}_grp", auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        received_messages = [message.value.decode() for message in consumer]
        print(received_messages)
>       assert all([message in received_messages for message in hits])
E       assert False
E        +  where False = all([False, False, False])

test_output.py:107: AssertionError


如您所见,包含received_messages的数组为空。
但是当我连接到大数据工具的Kafka插件时,我看到这些消息已经被传递了。
我做错了什么?

编辑

要复制,可以将output.forward_to_kafka(producer, topic, [message.encode() for message in hits])替换为

for message in hits:
    producer.send(topic, message)
producer.flush()


共同点就是,我的功能都在做什么。
这是我的docker-composit.yml文件

version: "3.9"

services:
  kafka: # DNS-1035
    hostname: kafka
    image: docker-proxy.artifactory.tcsbank.ru/bitnami/kafka:3.5
    expose:
      - "9092"
      - "9093"
      - "9094"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://kafka:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_PROCESS_ROLES=controller,broker

volumes:
  kafka_data:
    driver: local

zlhcx6iw

zlhcx6iw1#

您需要将通告的侦听器设置为包含EXTERNAL://localhost:9094。您当前有两个完全相同的侦听器...并设置Map9094:9094,以便主机中不使用随机端口
您还可以从暴露的Map中删除端口9092和9093,因为主机永远不会使用它们来访问Kafka服务

相关问题