我已经使用 express.js
,其中我还发布到 kafka
. 我用的是 kafka-node
. 下面是我的 server.js
以及 app.js
以及 settings-router.js
按顺序: server.js
:
var app = require('./app');
const http = require('http');
require('dotenv').config();
var port = normalizePort(process.env.PORT || '3999');
app.set('port', port);
var server = http.createServer(app);
server.listen(port);
server.on('error', onError);
server.on('listening', onListening);
server.on('close', onClose);
function normalizePort(val) {
var port = parseInt(val, 10);
if (isNaN(port)) {
return val;
}
if (port >= 0) {
return port;
}
return false;
}
const unhandledRejections = new Map();
process.on('unhandledRejection', (reason, promise) => {
unhandledRejections.set(promise, reason);
if (reason instanceof Error) {
console.log(reason,{label: 'ServerError', scope: 'ServerError', message: `Unhandled Rejection.`} );
}
});
process.on('rejectionHandled', (promise) => {
unhandledRejections.delete(promise);
});
process.on('uncaughtException', (err, origin) => {
console.warn(`{"timestamp": "${new Date().toISOString()}","message":"Uncaught Exception.","scope":"ServerError","error":"${err.name}. ${err.message}", "stack":"${err.stack}","origin":"${origin}"}`);
console.log(err, 'uncaughtException', {label: 'ServerError', origin: origin, scope: 'ServerError'});
});
process.on('warning', (warning) => {
console.log(warning, 'warning', {label: 'ServerWarning', origin: origin, scope: 'ServerError'});
});
process.on('beforeExit', (code) => {
});
process.on('exit', (code) => {
logInfo({message: `Process exit event with code ${code}.`, scope: 'Server'});
});
function onError(error) {
if (error.syscall !== 'listen') {
throw error;
}
var bind = typeof port === 'string'
? 'Pipe ' + port
: 'Port ' + port;
switch (error.code) {
case 'EACCES':
console.log(error, {message: `${bind} requires elevated privileges. `, label: 'ServerError', scope: 'ServerError'});
process.exit(1);
break;
case 'EADDRINUSE':
console.log(error, {message: `${bind} is already in use. `, label: 'ServerError', scope: 'ServerError'});
process.exit(1);
break;
default:
throw error;
}
}
var KafkaServices = require('./services/kafka-services');
kafka = require('kafka-node');
function onListening() {
var addr = server.address();
var bind = typeof addr === 'string'
? 'pipe ' + addr
: 'port ' + addr.port;
console.log('Listening on ' + bind);
logInfo({message: `Server is Listening on http://localhost:${addr.port}.`, scope: 'Server'});
kafkaTool = new KafkaServices();
// Connect to kafka
try{
kafkaTool.producerOn();
} catch (err) {
logError(err, 'Error connecting to Kafka', 'Kafka');
}
}
function onClose() {
var addr = server.address();
var bind = typeof addr === 'string'
? 'pipe ' + addr
: 'port ' + addr.port;
logInfo({message: `Server http://localhost:${addr.port} closed.`, scope: 'Server'});
}
module.exports = {app, onClose}
``` `app.js` :
const express = require('express');
const helmet = require('helmet');
const cookieParser = require('cookie-parser');
var createError = require('http-errors');
var path = require('path');
const settingsRouter = require('./routes/settings-router');
const intercept_request = require('./services/interceptor-service');
const app = express();
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'pug');
app.use(helmet());
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(intercept_request);
app.use('/api/v1/settings', settingsRouter);
app.use(function(req, res, next) {
next(createError(404));
});
app.use(function(err, req, res, next) {
// set locals, only providing error in development
res.locals.message = err.message;
res.locals.error = req.app.get('env') === 'development' ? err : {};
res.status(err.status || 500);
res.render('error');
});
module.exports = app;
``` settings-router.js
:
const express = require('express');
const router = express.Router();
const KafkaServices = require('../services/kafka-services');;
const { isNil } = require('lodash');
const {logInfo, logError} = require('../services/logger');
router.post('/', async (req, res, next) => {
if (isNil(req['api_key'])) {
next();
} else {
try{
if (isNil(req.body)) {
return res.status(400).send({
message: 'Request body is empty or null.',
});
}
let payload = {
topic: process.env.KAFKA_TOPIC || 'fc_ds_devicesettings',
messages: [], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
key: null, // string or buffer, only needed when using keyed partitioner
timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
};
payload.key = isNil(req.body.DeviceId) ? '': req.body.DeviceId;
payload.messages = JSON.stringify(req.body);
let kafkaTool = new KafkaServices();
let result = await kafkaTool.publish(new Array(payload));
console.log({message: 'Settings API Result.', data: result}, 'API')
} catch(err){
console.log(err, { message: 'Error Caught in POST /api/v1/settings.', data: req.body}, 'API')
}
return res.send({
message: 'Settings sent for processing.',
payload: req.body
});
}
}, async (req, res, next) => {
console.log({message: 'Unauthorized access to settings API', data: req.body}, 'API');
return res.status(401).send({
message: 'Unauthorized access to settings API.'
});
});
router.get('/temp', async (req, res, next) => {
let kafkaTool = new KafkaServices();
let result = await kafkaTool.publish(new Array({
topic: process.env.KAFKA_TOPIC || 'fc_ds_devicesettings',
messages: 'Test Message',
key: '1234567',
timestamp: Date.now()
}));
return res.status(200).json({
message: 'Settings sent for processing.',
result: result
});
});
module.exports = router;
这一切都在起作用。我能用POST方法来处理有效载荷,这太好了!我想用摩卡茶做些测试。下面是我的测试,但我得到了错误: TypeError: Cannot read property 'body' of undefined at chai.request.post.type.send.then
我不知道为什么。在此方面的任何帮助都将不胜感激!谢谢
测试文件: post.js
:
process.env.NODE_ENV = 'test';
const server = require('../server');
const app = server.app;
const settingsRouter = require('../routes/settings-router');
let chai = require('chai');
let chaiHttp = require('chai-http');
let should = chai.should();
chai.use(chaiHttp);
describe('/POST api/v1/settings/', () => {
it('It should post a full payload', (done) =>{
chai.request(app)
.post('localhost:3999/api/v1/settings/')
.type('Form')
.send({"DeviceId": "1111111111",
"DeviceType": 0,
"Settings": [
{
"Key": "Test",
"Value": "true"
}
]
})
.then((err, res) => {
try{
console.log(res.body);
res.should.have.status(200);
res.body.should.be.a('object');
res.body.should.have.property('message');
res.body.should.have.property('payload');
} catch(err) {
console.log(err);
}
done();
});
});
});
暂无答案!
目前还没有任何答案,快来回答吧!