在建立安全TLS连接之前,客户端网络套接字断开,如何在Node js中使用Kafka JS连接到Kafka集群?

ezykj2lf  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(170)

提供给我的凭证:-(我还有Kafka.keystore.jksKafka.truststore.jks

  1. host: xxxxx-xxxxx-x.cloudclusters.net
  2. port: xxxxx
  3. ip: xxx.xxx.xxx.xx
  4. trustore pw: xxxxxxxx
  5. keystore pw: xxxxxxxx

字符串
我想我没有使用所有提供的凭据。

  1. import * as dotenv from 'dotenv'
  2. import express from 'express'
  3. import { Kafka } from 'kafkajs';
  4. import { Partitioners } from 'kafkajs';
  5. import jks from 'jks-js';
  6. import fs from 'fs';
  7. const keystore = jks.toPem(
  8. fs.readFileSync('./kafka.keystore.jks'),
  9. 'mypassword'
  10. );
  11. const trustore = jks.toPem(
  12. fs.readFileSync('./kafka.truststore.jks'),
  13. 'mypassword'
  14. );
  15. const {
  16. caroot: {ca},
  17. localhost: {key,cert} } = keystore;
  18. // const { caroot: {ca} } = trustore;
  19. console.log("**************** kafka.keystore.jks ****************");
  20. // console.log(keystore)
  21. console.log("ca ===>", ca);
  22. console.log("key ===>", key);
  23. console.log("cert ===>", cert);
  24. console.log("**************** kafka.truststore.jks ****************");
  25. // setting up kafka
  26. const kafka = new Kafka({
  27. clientId: 'qa-topic',
  28. brokers: ['xxxxxxx.cloudclusters.net:xxxxx'], //HOST:PORT
  29. ssl: {
  30. rejectUnauthorized: false,
  31. ca: ca,
  32. key: key,
  33. cert: cert
  34. },
  35. })
  36. const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })
  37. producer.on('producer.connect', () => {
  38. console.log(`KafkaProvider: connected`);
  39. });
  40. producer.on('producer.disconnect', () => {
  41. console.log(`KafkaProvider: could not connect`);
  42. });
  43. producer.on('producer.network.request_timeout', (payload) => {
  44. console.log(`KafkaProvider: request timeout ${payload.clientId}`);
  45. });
  46. const run = async () => {
  47. // Producing
  48. await producer.connect()
  49. await producer.send({
  50. topic: 'supplier-ratings',
  51. messages: [
  52. {
  53. value: Buffer.from(JSON.stringify(
  54. {
  55. "event_name": "QA",
  56. "external_id": user_uuiD,
  57. "payload": {
  58. "supplier_id": i.supplier_id,
  59. "assessment": {
  60. "performance": 7,
  61. "quality": 7,
  62. "communication": 7,
  63. "flexibility": 7,
  64. "cost": 7,
  65. "delivery": 6
  66. }
  67. },
  68. "metadata": {
  69. "user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
  70. }
  71. }
  72. ))
  73. },
  74. ],
  75. })
  76. //Consuming
  77. await consumer.connect()
  78. await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
  79. await consumer.run({
  80. eachMessage: async ({ topic, partition, message }) => {
  81. console.log({
  82. partition,
  83. offset: message.offset,
  84. value: message.value.toString(),
  85. })
  86. },
  87. })
  88. }
  89. const port = process.env.PORT || 5000;
  90. app.listen(port, () => {
  91. console.log(`I am listening at ${port}`);
  92. });


我已经从我的kafka.keystore.jks得到了cakeycert。我根据文档在SSL对象中传递它们。但仍然得到Client network socket disconnected before secure TLS connection was established错误。
我无法与Kafka集群建立连接。我相信我丢失了一些密钥。我正在跟踪Kafka.js's Documentation

nfs0ujit

nfs0ujit1#

将默认值connectionTimeout增加到25000帮助了我。

  1. const kafka = new Kafka({
  2. ... your configuration (clientId, brokers, ssl, etc...),
  3. connectionTimeout: 25000,
  4. });

字符串

相关问题