nodejs应用程序通过http_proxy进行Kafka通信https_proxy代理获取错误

jjjwad0x  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(169)

{“level”:“ERROR”,“timestamp”:“2023-08- 31 T15:17:48.211Z”,“logger”:“kafkajs”,“message”:“[Connection] Connection timeout”,“broker”:“confluent-cloud-kafka:9092”,“clientId”:“lloydsbank-group”} {“level”:“ERROR”,“timestamp”:“2023-08- 31 T15:17:48.212Z”,“logger”:“kafkajs”,“message”:“[BrokerPool]无法连接到seed broker,尝试列表中的另一个broker:连接超时”,“retryCount”:5,“retryTime”:7450} {“level”:50,“time”:“2023-08- 31 T15:17:48.213Z”,“pid”:18,“hostname”:“k8s-service-69975464 ff-z2 sgt”,“name”:“svs-k8s-service”,“msg”:“confluent Kafka Kafka broker连接失败。因此,我们不能消费来自融合Kafka Kafka Topic的消息!!"} {“level”:50,“time”:“2023-08-31T15:17:48.213Z”,“pid”:18,“hostname”:“k8s-service-69975464ff-z2sgt”,“name”:“svs-k8s-service”,“msg”:“confluent Kafka Kafka error {“name”:“KafkaJSNumberOfOfExceeded”,“retriable”:false,“retryCount”:5,“retryTime”:7450}"}返回-/返回--通知/oapi(POST)
{“level”:30,“time”:“2023-08-31T15:17:48.227Z”,“pid”:18,“hostname”:“k8s-service-69975464ff-z2sgt”,“name”:“svs-k8s-service”,“msg”:“Server listening at {“level”:“ERROR”,“timestamp”:“2023-08-31T15:17:48.673Z”,“logger”:“kafkajs”,“message”:“[Connection]连接错误:read EASHINGTON”,“broker”:“confluent-cloud-Kafka:9092”,“clientId”:“lloydsbank-group”,“stack”:“错误:在TLSWrap.onStreamRead(node:internal/stream_base_commons:217:20)上读取数据包
nodejs kafkajs连接使用http_proxy或https_proxy代理

qpgpyjmq

qpgpyjmq1#

'use strict';
const fp = require('fastify-plugin');
const { Kafka } = require('kafkajs');
const tls = require('tls')
const net = require('net')
const { createTunnel, closeTunnel } = require('proxy-chain');

async function KafkaPlugin(fastify, opts, done) {
    const kafka = new Kafka({
        clientId: fastify.config.KAFKA_CLIENT_ID,
        brokers: [fastify.config.KAFKA_BOOTSTRAP_SERVER],
        //  - SASL - tested on local dev-mac, using ssl true only getting msgs
        ssl: true,
        sasl: {
            mechanism: 'plain',
            username: fastify.config.KAFKA_CLIENT_SASL_API_KEY,
            password: fastify.config.KAFKA_CLIENT_SASL_API_SECRET
        },
        socketFactory: ({ host, port, ssl, onConnect }) => {
            const socket = ssl ? new tls.TLSSocket() : new net.Socket()

            createTunnel(process.env.HTTP_PROXY, `${host}:${port}`)
                .then((tunnelAddress) => {
                    // const [tunnelHost, tunnelPort] = tunnelAddress.split(':')
                    const tunnelHost = 'localhost';
                    const tunnelPort = tunnelAddress.split(':').slice('-1')[0];

                    socket.setKeepAlive(true, 60000)
                    socket.connect(
                        Object.assign({ host: tunnelHost, port: tunnelPort, servername: host }, ssl),
                        onConnect
                    )
                })
                .catch(error => socket.emit('error', error));

            return socket;
        }
    })
    const kafkaTopic = fastify.config.KAFKA_CONSUMER_TOPIC_NAME;

    try {
        const consumer = kafka.consumer({ groupId: fastify.config.KAFKA_CLIENT_ID });
        await consumer.connect();
        fastify.log.trace(' Kafka Borker connected successfully!!')

        await consumer.subscribe({ topic: kafkaTopic, fromBeginning: true });
        fastify.log.trace(` Kafka Topic - ${kafkaTopic} subscribed successfully!!`)

        //fastify.decorate('kafka_consumer', consumer);

        await consumer.run({
            autoCommit: false,
            eachMessage: async ({ topic, partition, message }) => {
                console.log({
                    topic,
                    partition,
                    offset: message.offset,
                    produced_datetime: new Date(message.timestamp / 1000).toISOString(),
                    value: message.value.toString(),

                });

                // Publish the  messages into LBG Kafka Topic
                await fastify.lbg_kafka_producer(message.value.toString());
            },
        });
    } catch (error) {
        fastify.log.error(' Kafka broker connnection failed. So, we cannot consume messages from  Kafka Topic!!');
        console.log(error);
    }

    done();
};
module.exports = fp(KafkaPlugin, { name: 'KafkaConsumer' });

相关问题