我已经使用消费者api实现了一个kafka应用程序。我用流api实现了两个回归测试:
测试快乐路径:通过从测试中生成应用程序将使用的数据(到应用程序正在侦听的输入主题中),应用程序将生成测试将使用的数据(到输出主题中),并根据预期的输出数据进行验证。
测试错误路径:行为同上。尽管这一次应用程序将把数据生成输出主题,而测试将使用应用程序的错误主题,并根据预期的错误输出进行验证。
我的代码和回归测试代码位于预期目录结构下的同一项目下。两个时间(对于两个测试)数据都应该由应用程序端的同一侦听器获取。
问题是:
当我单独(手动)执行测试时,每个测试都是通过的。然而,如果我一起执行它们,但顺序(例如:gradle clean build),只有第一个测试通过。第二个测试在测试端使用者轮询数据之后失败,并且在一段时间之后它放弃了找不到任何数据。
观察:
从调试来看,似乎第一次一切都完美地工作了(测试端和应用程序端的生产者和消费者)。但是,在第二次测试期间,应用程序端使用者似乎没有接收到任何数据(似乎测试端生产者正在生成数据,但不能确定),因此没有数据生成到错误主题中。
到目前为止,我尝试了:
经过调查,我的理解是我们正在进入比赛状态,为了避免这种情况,我发现了如下建议:
使用@dirtiescontext(classmode=dirtiescontext.classmode.after\u每个\u测试\u方法)
在每次测试后撕掉代理(请参阅代理上的“.destry()”)
每个测试使用不同的主题名
我申请了所有这些,但仍然无法恢复我的问题。
我在这里提供代码供阅读。任何见解都是值得赞赏的。
第一次测试的代码(测试错误路径):
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.ERROR_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationFailurePathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;
//To read from output error
@Autowired
protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
.
@TestConfiguration
public class AdapterStreamFailurePathTestConfig {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${spring.kafka.adapter.application-id}")
private String applicationId;
@Value("${spring.kafka.adapter.group-id}")
private String groupId;
//Producer of records that the program consumes
@Bean
public Map<String, Object> sendEmailCmdProducerConfigs() {
Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
return results;
}
@Bean
public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
}
@Bean
public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
return new KafkaTemplate<>(inputProducerFactory());
}
//Consumer of the error output, generated by the program
@Bean
public Map<String, Object> outputErrorConsumerConfig() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
return rpf.createConsumer(groupId, "notification-failure");
}
}
.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {
@Autowired
private DataGenaratorForErrorPath400Test datagen;
@Mock
private AdapterHttpClient httpClient;
@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;
@Before
public void setup() throws InterruptedException {
Mockito.when(httpClient.callApi(Mockito.any()))
.thenReturn(
new GenericResponse(
400,
TestConstants.ERROR_MSG_TO_CHK));
Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();
inputProducerTemplate.send(
projectProerties.getInputTopic(),
datagen.getKey(),
datagen.getEmailCmdToProduce());
System.out.println("producer: "+ projectProerties.getInputTopic());
subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);
}
@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {
ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();
int attempt = 0;
int expectedRecords = 1;
do {
records = KafkaTestUtils.getRecords(outputErrorConsumer);
records.forEach(outputListOfErrors::add);
attempt++;
} while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);
//Verify the recipient event stream size
Assert.assertEquals(expectedRecords, outputListOfErrors.size());
//Validate output
}
@After
public void tearDown() {
outputErrorConsumer.close();
embeddedFailurePathKafkaBroker.destroy();
}
}
第二次测试在结构上几乎相同。尽管这次测试端使用者是从应用程序端输出主题(而不是错误主题)消费的。我给消费者,经纪人,制片人,主题起了不同的名字。比如:
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.OUTPUT_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationSuccessPathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;
//To read from output regular topic
@Autowired
protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
如果我需要提供更多的信息,请告诉我。,
2条答案
按热度按时间qxsslcnc1#
“端口=9092”
不要使用固定端口;如果不考虑这一点,嵌入式代理将使用一个随机端口;在中设置使用者配置
KafkaTestUtils
指向随机端口。您不应该在每个测试方法之后弄脏上下文-使用不同的
group.id
对于每个测试和不同的topic
.dldeef672#
在我的情况下,消费者没有正确关闭。我必须做:
解决问题。