apachekafka:生产者没有生成所有数据

j8yoct9x  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(514)

我是Kafka的新人。我的要求是,在数据库源和目标中有两个表。现在我想从源表中获取数据并将其存储到目标表中,Kafka将作为生产者和消费者工作。我已经完成了代码,但问题是当producer生成数据时,会丢失一些要生成的数据。例如,如果我在源表中有100条记录,那么它不会生成所有100条记录。我用的是Kafka-0.10
myproducer配置-

  1. bootstrap.servers=192.168.1.XXX:9092,192.168.1.XXX:9093,192.168.1.XXX:9094
  2. acks=all
  3. retries=2
  4. batch.size=16384
  5. linger.ms=2
  6. buffer.memory=33554432
  7. key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
  8. value.serializer=org.apache.kafka.common.serialization.StringSerializer

我的制作人code:-

  1. public void run() {
  2. SourceDAO sourceDAO = new SourceDAO();
  3. Source source;
  4. int id;
  5. try {
  6. logger.debug("INSIDE RUN");
  7. List<Source> listOfEmployee = sourceDAO.getAllSource();
  8. Iterator<Source> sourceIterator = listOfEmployee.iterator();
  9. String sourceJson;
  10. Gson gson = new Gson();
  11. while(sourceIterator.hasNext()) {
  12. source = sourceIterator.next();
  13. sourceJson = gson.toJson(source);
  14. id = source.getId();
  15. producerRecord = new ProducerRecord<Integer, String>(TOPIC, id, sourceJson);
  16. producerRecords.add(producerRecord);
  17. }
  18. for(ProducerRecord<Integer, String> record : producerRecords) {
  19. logger.debug("Producer Record: " + record.value());
  20. producer.send(record, new Callback() {
  21. @Override
  22. public void onCompletion(RecordMetadata metadata, Exception exception) {
  23. logger.debug("Exception: " + exception);
  24. if (exception != null)
  25. throw new RuntimeException(exception.getMessage());
  26. logger.info("The offset of the record we just sent is: " + metadata.offset()
  27. + " In Partition : " + metadata.partition());
  28. }
  29. });
  30. }
  31. producer.close();
  32. producer.flush();
  33. logger.info("Size of Record: " + producerRecords.size());
  34. } catch (SourceServiceException e) {
  35. logger.error("Unable to Produce data...", e);
  36. throw new RuntimeException("Unable to Produce data...", e);
  37. }
  38. }

我的消费者config:-

  1. bootstrap.servers=192.168.1.XXX:9092,192.168.1.231:XXX,192.168.1.232:XXX
  2. group.id=consume
  3. client.id=C1
  4. enable.auto.commit=true
  5. auto.commit.interval.ms=1000
  6. max.partition.fetch.bytes=10485760
  7. session.timeout.ms=35000
  8. consumer.timeout.ms=35000
  9. auto.offset.reset=earliest
  10. message.max.bytes=10000000
  11. key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

value.deserializer=org.apache.kafka.common.serialization.stringdeserializer
消费者code:-

  1. public void doWork() {
  2. logger.debug("Inside doWork of DestinationConsumer");
  3. DestinationDAO destinationDAO = new DestinationDAO();
  4. consumer.subscribe(Collections.singletonList(this.TOPIC));
  5. while(true) {
  6. ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
  7. int minBatchSize = 1;
  8. for(ConsumerRecord<String, String> rec : consumerRecords) {
  9. logger.debug("Consumer Recieved Record: " + rec);
  10. consumerRecordsList.add(rec);
  11. }
  12. logger.debug("Record Size: " + consumerRecordsList.size());
  13. if(consumerRecordsList.size() >= minBatchSize) {
  14. try {
  15. destinationDAO.insertSourceDataIntoDestination(consumerRecordsList);
  16. } catch (DestinationServiceException e) {
  17. logger.error("Unable to update destination table");
  18. }
  19. }
  20. }
  21. }
ygya80vv

ygya80vv1#

从这里可以看到的情况来看,我猜你没有冲洗或关闭生产商。您应该注意,send是异步运行的,只是准备一个稍后发送的批处理(取决于生产者的配置):
从Kafka文献中
send()方法是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。
你应该试着打电话 producer.close() 在您遍历所有producerrecords之后(顺便说一句:为什么要缓存整个producerrecords,当您有许多记录时可能会导致问题)。
如果这对你没有帮助,你应该试着使用一个例如控制台消费者来找出缺少什么。请提供更多的代码。生产商是如何配置的?你的消费者看起来怎么样?生产记录的类型是什么?
希望有帮助。

相关问题