带有kafka消费记录的并行流

8yoxcaq7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(275)

我有Kafka的记录:

ConsumerRecords<String, Events> records = kafkaConsumer.poll(POLL_TIMEOUT);

我想使用并行流而不是多线程来运行下面的代码。

records.forEach((record) -> {
                Event event = record.value();

                       HTTPSend.send(event);

            });

我尝试了mlutithreading,但我想尝试parallelstream:

for (ConsumerRecord<String, Event> record : records) {
                        executor.execute(new Runnable() {
                            @Override
                            public void run() {

                                        HTTPSend.send(Event);

                            }
                        });

                    }

实际上,我在使用多线程的http.send时遇到了一个问题(即使使用1个线程的线程池)。我要走了 "Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target" . 这是通过https的请求。此错误仅在第一次发出请求时出现。之后,例外消失了。噗!
对于我使用的多线程:

int threadCOunt=1;
                BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(threadCOunt, true);
                RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
                ExecutorService executor = new ThreadPoolExecutor(threadCOunt, threadCOunt, 0L, TimeUnit.MILLISECONDS, queue, handler);

httpsend.send()是:

long sizeSend = 0;
    SSLContext sc = null;

    try {
        sc = SSLContext.getInstance("TLS");
        sc.init(null, TRUST_ALL_CERTS, new SecureRandom());
    } catch (NoSuchAlgorithmException | KeyManagementException e) {
        LOGGER.error("Failed to create SSL context", e);
    }

    // Ignore differences between given hostname and certificate hostname
    HostnameVerifier hv = (hostname, session) -> true;

    // Create the REST client and configure it to connect meta
    Client client = ClientBuilder.newBuilder()
            .hostnameVerifier(hv)
            .sslContext(sc).build();

    WebTarget baseTarget = client.target(getURL()).path(HTTP_PATH);
    Response jsonResponse = null;

    try {
        StringBuilder eventsBatchString = new StringBuilder();
        eventsBatchString.append(this.getEvent(event));
        Entity<String> entity = Entity.entity(eventsBatchString.toString(), MediaType.APPLICATION_JSON_TYPE);
        builder = baseTarget.request();
        LOGGER.debug("about to send the event {} and URL {}", entity, getURL());
        jsonResponse = builder.header(HTTP_ACK_CHANNEL, guid.toString())
                .header("Content-type", MediaType.APPLICATION_JSON)
                .header("Authorization", String.format("Meta %s", eventsModuleConfig.getSecretKey()))
                .post(entity);
lfapxunr

lfapxunr1#

我知道你想做什么,我不确定这是不是最好的主意(我也不确定这不是)。
这个 poll / commit Kafka模型允许简单的背压和保留最后一个项目处理,如果你崩溃。通过“立即”返回您的轮询循环,您告诉kafka“我准备好接受更多”,并且提交偏移量(手动或自动)告诉kafka您已成功读取到该点。
你想做的是尽可能快地读取Kafka,提交偏移量,然后将Kafka记录放入执行器队列,然后平衡每秒的请求数等等。
我不是100%确定这是个好主意:如果你的应用程序崩溃了怎么办?你可能已经提交了一些Kafka的信息,实际上并没有使它上游。如果您真的想这样做,我建议您手动提交偏移量(通过 commitSync )竣工后 Runnable ,而不是让高级消费者为你做。
为什么要使用线程执行器:我认为这些也可以通过kafka实现。
您可能希望同时向web服务器发布多条消息。一个划分良好的kafka主题将允许多个使用者/使用者组使用多个分区,因此,假设一个完全可伸缩的http服务器,将允许您并行地将消息发布到服务器。是的,基于进程的并发!
可能web服务器不是完全可伸缩的,或者对于这个请求来说速度很慢(假设每个请求需要1秒):您需要限制web服务器每秒的请求数,如果您有一个队列,您可能会有几个线程在不备份kafka的情况下发布。
在这种情况下,可以将max.poll.records设置为web服务器所需的可伸缩值。也许还有更好的方法来做这件事,尽管我现在不知道。
如果您的web服务器需要很长时间来响应,则可能会出现与心跳失败相关的错误。在这种情况下,我会告诉你关于超时/心跳主题的答案。
与使用线程执行器(threadexecutioner)使同步http请求看起来是异步的不同,我将使用netty这样的事件http客户机,从而在没有基于线程的并发的情况下实现并行。

相关问题