Kafka消费者陷入困境

oxf4rvwz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(278)

我们使用kafka流插入到postgresql中,因为流太高,所以避免了直接插入。消费者似乎工作得很好,但偶尔会陷入困境,找不到相同的根本原因。
消费者已经运行了大约6个月,已经消耗了数十亿张唱片。我不明白为什么它最近被卡住了。我甚至不知道从哪里开始调试。
下面是处理记录的代码:`private static void readfromtopic(datasource datasource,consumeroptions options){

KafkaConsumer<String, String> consumer = KafkaConsumerConfig.createConsumerGroup(options);
    Producer<Long, String> producer = KafkaProducerConfig.createKafkaProducer(options);

    if (options.isReadFromAnOffset()) {
        // if want to assign particular offsets to consume from
        // will work for only a single partition for a consumer
        List<TopicPartition> tpartition = new ArrayList<TopicPartition>();
        tpartition.add(new TopicPartition(options.getTopicName(), options.getPartition()));
        consumer.assign(tpartition);
        consumer.seek(tpartition.get(0), options.getOffset());
    } else {
        // use auto assign partition & offsets
        consumer.subscribe(Arrays.asList(options.getTopicName()));
        log.debug("subscribed to topic {}", options.getTopicName());
    }

    List<Payload> payloads = new ArrayList<>();

    while (true) {

        // timer is the time to wait for messages to be received in the broker
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(50));

        if(records.count() != 0 )
        log.debug("poll size is {}", records.count());
        Set<TopicPartition> partitions = records.partitions();
        // reading normally as per round robin and the last committed offset
        for (ConsumerRecord<String, String> r : records) {

            log.debug(" Parition : {} Offset : {}", r.partition(), r.offset());

            try {

                JSONArray arr = new JSONArray(r.value());
                for (Object o : arr) {
                    Payload p = JsonIterator.deserialize(((JSONObject) o).toString(), Payload.class);
                    payloads.add(p);
                }

                List<Payload> steplist = new ArrayList<>();
                steplist.addAll(payloads);

                // Run a task specified by a Runnable Object asynchronously.
                CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
                    @Override
                    public void run() {

                        try {
                            Connection conn = datasource.getConnection();

                            PgInsert.insertIntoPg(steplist, conn, consumer, r, options.getTopicName(),
                                    options.getErrorTopic(), producer);

                        } catch (Exception e) {

                            log.error("error in processing future {}", e);
                        }

                    }
                }, executorService);

                // used to combine all futures
                allfutures.add(future);

                payloads.clear();

            } catch (Exception e) {

                // pushing into new topic for records which have failed
                log.debug("error in kafka consumer {}", e);
                ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(options.getErrorTopic(),
                        r.offset(), r.value());
                producer.send(record);

            }
        }

        // commiting after every poll
        consumer.commitSync();

        if (records.count() != 0) {
            Map<TopicPartition, OffsetAndMetadata> metadata = consumer.committed(partitions);

            // reading the committed offsets for each partition after polling
            for (TopicPartition tpartition : partitions) {
                OffsetAndMetadata offsetdata = metadata.get(tpartition);

                if (offsetdata != null && tpartition != null)
                    log.debug("committed offset is " + offsetdata.offset() + "  for topic partition "
                            + tpartition.partition());
            }
        }

        // waiting for all threads to complete after each poll
        try {
            waitForFuturesToEnd();
            allfutures.clear();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}`

之前我认为它卡住的原因是被消耗的记录的大小,所以我减少了 MAX_POLL_RECORDS_CONFIG10 . 这将确保在轮询中获取的记录不会超过200kb,因为每个记录的最大大小可以是20kb。
正在考虑使用spring框架来解决这个问题,但在此之前,我想知道消费者到底为什么会陷入困境。对此的任何见解都会有所帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题