我有一个自定义函数forward_to_kafka(list: List)
,即使有问题,它也会把我的事件发送到Kafka,它的目的是传递我列表中的所有消息。我已经用JB插件大数据工具测试过了,它工作正常。
但现在我需要编写自动化集成测试,我已经有了这个:
@pytest.mark.parametrize("topic, hits", [
("topic1", []),
("topic3", ["ev1", "ev2", "ev3"]),
("topic2", ["event"]),
("topic4", ['{"event": ["arr_elem"]}', '{"event_num": 13}', '{"event": {"subev": "value"}}']),
])
@pytest.mark.integration
def test_forward_to_kafka_integration(self, topic, hits, output):
kafka_host = 'localhost:9094'
producer = KafkaProducer(bootstrap_servers=[kafka_host], acks='all',)
output.forward_to_kafka(producer, topic, [message.encode() for message in hits])
consumer = KafkaConsumer(topic, bootstrap_servers=[kafka_host],
group_id=f"{topic}_grp", auto_offset_reset='earliest',
consumer_timeout_ms=1000)
received_messages = [message.value.decode() for message in consumer]
print(received_messages)
assert all([message in received_messages for message in hits])
字符串
我在docker容器中有一个Kafka,连接正常,但第二次测试总是失败。更正确的是,如果我运行空Kafka,没有保存数据,只有清晰的图像,第一个动作的push测试总是失败。
执行第2次测试的结果如下:
FAILED [ 50%][]
test_output.py:90 (TestForwardToKafka.test_forward_to_kafka_integration[topic3-hits1])
self = <tests.test_output.TestForwardToKafka object at 0x107b49520>
topic = 'topic3', hits = ['ev1', 'ev2', 'ev3']
output = <h3ra.output.Output object at 0x107c14b80>
@pytest.mark.parametrize("topic, hits", [
("topic1", []),
("topic3", ["ev1", "ev2", "ev3"]),
("topic2", ["event"]),
("topic4", ['{"event": ["arr_elem"]}', '{"event_num": 13}', '{"event": {"subev": "value"}}']),
])
@pytest.mark.integration
def test_forward_to_kafka_integration(self, topic, hits, output):
kafka_host = 'localhost:9094'
producer = KafkaProducer(bootstrap_servers=[kafka_host], acks='all',)
output.forward_to_kafka(producer, topic, [message.encode() for message in hits])
consumer = KafkaConsumer(topic, bootstrap_servers=[kafka_host],
group_id=f"{topic}_grp", auto_offset_reset='earliest',
consumer_timeout_ms=1000)
received_messages = [message.value.decode() for message in consumer]
print(received_messages)
> assert all([message in received_messages for message in hits])
E assert False
E + where False = all([False, False, False])
test_output.py:107: AssertionError
型
如您所见,包含received_messages的数组为空。
但是当我连接到大数据工具的Kafka插件时,我看到这些消息已经被传递了。
我做错了什么?
编辑
要复制,可以将output.forward_to_kafka(producer, topic, [message.encode() for message in hits])
替换为
for message in hits:
producer.send(topic, message)
producer.flush()
型
共同点就是,我的功能都在做什么。
这是我的docker-composit.yml文件
version: "3.9"
services:
kafka: # DNS-1035
hostname: kafka
image: docker-proxy.artifactory.tcsbank.ru/bitnami/kafka:3.5
expose:
- "9092"
- "9093"
- "9094"
volumes:
- "kafka_data:/bitnami"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://kafka:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_PROCESS_ROLES=controller,broker
volumes:
kafka_data:
driver: local
型
1条答案
按热度按时间zlhcx6iw1#
您需要将通告的侦听器设置为包含EXTERNAL://localhost:9094。您当前有两个完全相同的侦听器...并设置Map9094:9094,以便主机中不使用随机端口
您还可以从暴露的Map中删除端口9092和9093,因为主机永远不会使用它们来访问Kafka服务