node.js和kafka:不发布到kafka主题的消息

kfgdxczn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(468)

与node.js相比,有人能告诉我为什么不能将事件发送到kafka主题吗?但是代码没有产生任何错误。

var express = require('express');
var path = require('path');
var favicon = require('serve-favicon');
var logger = require('morgan');
var cookieParser = require('cookie-parser');
var bodyParser = require('body-parser');
var config = require('./config.js');
var nforce = require('nforce');
var dateTime = require('node-datetime');
var kafka = require('kafka-node');

var routes = require('./routes/index');

var dt = dateTime.create();

var org = nforce.createConnection({
  clientId: config.CLIENT_ID,
  clientSecret: config.CLIENT_SECRET,
  redirectUri: config.CALLBACK_URL + '/oauth/_callback',
  mode: 'multi',
  environment: config.ENVIRONMENT  // optional, sandbox or production, production default
});

HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client('localhost:9092'),
producer = new HighLevelProducer(client),

org.authenticate({ username: config.USERNAME, password: config.PASSWORD }, function(err, oauth) {

  if(err) return console.log(err);
  if(!err) {
    console.log('***Successfully connected to Salesforce***');
    // add any logic to perform after login
  }

  // subscribe to a pushtopic
  var str = org.stream({ topic: config.PUSH_TOPIC, oauth: oauth });

  str.on('connect', function(){
    console.log('Connected to pushtopic: ' + config.PUSH_TOPIC);
  });

  str.on('error', function(error) {
    console.log('Error received from pushtopic: ' + error);
  });

  str.on('data', function(data) {
    console.log('Received the following from pushtopic ---');
    var dataStream1 = data['sobject'];
    dataStream1['timestamp'] = dt.format('Y-m-d H:M:S');
    console.log(dataStream1);
    var dataStreamFinal = '[' + JSON.stringify(dataStream1) + ']';

    payloads = [
        { topic: 'testing1', messages: dataStreamFinal }
    ];

    console.log(payloads);

    producer.on('ready', function () {
    producer.send(payloads, function (err, data) {
        console.log(data);
    });
});

});

});

module.exports = {app: app, server: server};

我正在使用node的以下模块https://github.com/sohu-co/kafka-node
在此之前,我只是添加了代码,从salesforce pushtopic中提取事件,并将其发布到kafka是我的目标。

rnmwe5a2

rnmwe5a21#

终于可以把数据推给Kafka了。尽管nforce还不支持重播消息功能。

var express = require('express');
var path = require('path');
var favicon = require('serve-favicon');
var logger = require('morgan');
var cookieParser = require('cookie-parser');
var bodyParser = require('body-parser');
var config = require('./config.js');
var nforce = require('nforce');
var dateTime = require('node-datetime');

var routes = require('./routes/index');

var app = express();
var server = require('http').Server(app);
var dt = dateTime.create();

var io = require('socket.io')(server);
// get a reference to the socket once a client connects
var socket = io.sockets.on('connection', function (socket) { });

var org = nforce.createConnection({
  clientId: config.CLIENT_ID,
  clientSecret: config.CLIENT_SECRET,
  redirectUri: config.CALLBACK_URL + '/oauth/_callback',
  mode: 'multi',
  environment: config.ENVIRONMENT,  // optional, sandbox or production, production default
    apiVersion: 'v43.0'
});

org.authenticate({ username: config.USERNAME, password: config.PASSWORD }, function(err, oauth) {

  if(err) return console.log(err);
  if(!err) {
    console.log('***Successfully connected to Salesforce***');
    // add any logic to perform after login
  }

  // subscribe to a pushtopic
  var str = org.stream({ topic: config.PUSH_TOPIC, oauth: oauth });

  str.on('connect', function(){
    console.log('Connected to pushtopic: ' + config.PUSH_TOPIC);
  });

  str.on('error', function(error) {
    console.log('Error received from pushtopic: ' + error);
  });

  str.on('data', function(data) {
    console.log('Received the following from pushtopic ---');
    console.log(data);
    var dataStream1 = data['sobject'];
    dataStream1['timestamp'] = dateTime.create().format('Y-m-d H:M:S');
    var dataStreamFinal = '[' + JSON.stringify(dataStream1) + ']';

    var kafka = require('kafka-node'),
        HighLevelProducer = kafka.HighLevelProducer,
        client = new kafka.Client(),
        producer = new HighLevelProducer(client),
        payloads = [
        { topic: 'testing1', messages: dataStreamFinal}
    ];

    producer.on('ready', function () {
        producer.send(payloads, function (err, data) {
        console.log(data);
    });
});

    // emit the record to be displayed on the page

//Send to kafka topic
    socket.emit('record-processed', JSON.stringify(dataStreamFinal));
  });

});

// view engine setup
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'hbs');

app.use(function(req, res, next){
  res.io = io;
  next();
});

// uncomment after placing your favicon in /public
//app.use(favicon(path.join(__dirname, 'public', 'favicon.ico')));
app.use(logger('dev'));
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));

app.use('/', routes);

// catch 404 and forward to error handler
app.use(function(req, res, next) {
  var err = new Error('Not Found');
  err.status = 404;
  next(err);
});

// error handlers

// development error handler
// will print stacktrace
if (app.get('env') === 'development') {
  app.use(function(err, req, res, next) {
    res.status(err.status || 500);
    res.render('error', {
      message: err.message,
      error: err
    });
  });
}

// production error handler
// no stacktraces leaked to user
app.use(function(err, req, res, next) {
  res.status(err.status || 500);
  res.render('error', {
    message: err.message,
    error: {}
  });
});

module.exports = {app: app, server: server};

相关问题