我用NATS基准测试工具执行了一个延迟测试,使用了两种类型的NATS客户机。
A)本机NATS客户端(NATS CLI),可从https://github.com/nats-io/natscli/releases下载安装
程序
生产者命令:nats bench foo --pub 1 --request --msgs 100000 --size 3kb
消费者命令:nats bench foo --sub 1 --reply
版本:0.0.35
结果:~6600 Msg/s
延迟:1/(2*6600) = 0.075 ms
B)然后我使用了NATS客户端for NodeJS,编写了一个脚本,灵感来自NATS NodeJS Github存储库中提供的内容,以执行与原生NATS客户端相同的基准测试。
bench.js
#!/usr/bin/env node
const parse = require("minimist");
const {Nuid, connect, Bench, Metric} = require('nats')
const defaults = {
s: "127.0.0.1:4222",
c: 100000,
p: 128,
subject: new Nuid().next(),
i: 1,
json: false,
csv: false,
csvheader: false,
pending: 1024 * 32,
};
const argv = parse(
process.argv.slice(2),
{
alias: {
"s": ["server"],
"c": ["count"],
"d": ["debug"],
"p": ["payload"],
"i": ["iterations"],
},
default: defaults,
string: [
"subject",
],
boolean: [
"asyncRequests",
"callbacks",
"json",
"csv",
"csvheader",
],
},
);
if (argv.h || argv.help || (!argv.sub && !argv.pub && !argv.req)) {
console.log(
"usage: bench.ts [--json] [--callbacks] [--csv] [--csvheader] [--pub] [--sub] [--req (--asyncRequests)] [--count <#messages>=100000] [--payload <#bytes>=128] [--iterations <#loop>=1>] [--server server] [--subject <subj>]\n",
);
process.exit(0);
}
const server = argv.server;
const count = parseInt(argv.count);
const bytes = parseInt(argv.payload);
const iters = parseInt(argv.iterations);
const pl = parseInt(argv.pendingLimit) * 1024;
const metrics = [];
(async () => {
for (let i = 0; i < iters; i++) {
const nc = await connect(
{ servers: server, debug: argv.debug, pending: argv.pending },
);
const opts = {
msgs: count,
size: bytes,
asyncRequests: argv.asyncRequests,
callbacks: argv.callbacks,
pub: argv.pub,
sub: argv.sub,
req: argv.req,
rep: argv.rep,
subject: argv.subject,
};
const bench = new Bench(nc, opts);
const m = await bench.run();
metrics.push(...m);
await nc.close();
}
})().then(() => {
const reducer = (a, m) => {
if (a) {
a.name = m.name;
a.payload = m.payload;
a.bytes += m.bytes;
a.duration += m.duration;
a.msgs += m.msgs;
a.lang = m.lang;
a.version = m.version;
a.async = m.async;
a.max = Math.max(a.max === undefined ? 0 : a.max, m.duration);
a.min = Math.min(a.min === undefined ? m.duration : a.max, m.duration);
}
return a;
};
if (!argv.json && !argv.csv) {
const pubsub = metrics.filter((m) => m.name === "pubsub").reduce(
reducer,
new Metric("pubsub", 0),
);
const pub = metrics.filter((m) => m.name === "pub").reduce(
reducer,
new Metric("pub", 0),
);
const sub = metrics.filter((m) => m.name === "sub").reduce(
reducer,
new Metric("sub", 0),
);
const req = metrics.filter((m) => m.name === "req").reduce(
reducer,
new Metric("req", 0),
);
const rep = metrics.filter((m) => m.name === "rep").reduce(
reducer,
new Metric("rep", 0),
);
if (pubsub && pubsub.msgs) {
console.log(pubsub.toString());
}
if (pub && pub.msgs) {
console.log(pub.toString());
}
if (sub && sub.msgs) {
console.log(sub.toString());
}
if (req && req.msgs) {
console.log(req.toString());
}
if (rep && rep.msgs) {
console.log(rep.toString());
}
} else if (argv.json) {
console.log(JSON.stringify(metrics, null, 2));
} else if (argv.csv) {
const lines = metrics.map((m) => {
return m.toCsv();
});
if (argv.csvheader) {
lines.unshift(Metric.header());
}
console.log(lines.join(""));
}
});
程序
命令:node bench.js --subject test --req --payload 3500 --count 100000
NodeJS客户端版本:2.1.14
结果:~330 Msg/s
延迟:1/(2*330) = 1.5 ms
C)此外,我尝试创建两个客户端,一个发送消息的发布者和一个消费消息的订阅者,来测量有问题的消息的传输时间,而不需要通过基准测试工具。结果与基准测试工具bench.js计算的延迟时间的顺序相同。
pub.js
#!/usr/bin/env node
const parse = require("minimist");
const { connect, StringCodec, headers, credsAuthenticator } = require("nats");
const { delay } = require("./utils.js");
const fs = require("fs");
const argv = parse(
process.argv.slice(2),
{
alias: {
"s": ["server"],
"c": ["count"],
"i": ["interval"],
"f": ["creds"],
"h": ["headers"]
},
default: {
s: "127.0.0.1:4222",
c: 1,
i: 0,
},
boolean: true,
string: ["server", "count", "interval", "headers", "creds"],
},
);
const opts = { servers: argv.s };
const subject = String(argv._[0]);
const payload = argv._[1] || "";
const count = (argv.c === -1 ? Number.MAX_SAFE_INTEGER : argv.c) || 1;
const interval = argv.i || 0;
if (argv.h || argv.help || !subject) {
console.log(
"Usage: nats-pub [--creds=/path/file.creds] [-s server] [-c <count>=1] [-i <interval>=0] [-h='k=v;k2=v2'] subject [msg]",
);
console.log("to publish forever, specify -c=-1 or --count=-1");
process.exit(1);
}
if (argv.debug) {
opts.debug = true;
}
if (argv.creds) {
const data = fs.readFileSync(argv.creds);
opts.authenticator = credsAuthenticator(data);
}
(async () => {
let nc;
try {
nc = await connect(opts);
} catch (err) {
console.log(`error connecting to nats: ${err.message}`);
return;
}
console.info(`connected ${nc.getServer()}`);
nc.closed()
.then((err) => {
if (err) {
console.error(`closed with an error: ${err.message}`);
}
});
const hdrs = headers();
if (argv.headers) {
argv.headers.split(";").map((l) => {
const [k, v] = l.split("=");
hdrs.append(k, v);
});
}
hdrs.append("time", "0");
const sc = StringCodec();
for (let i = 1; i <= count; i++) {
const pubopts = {};
hdrs.set("time", Date.now().toString());
pubopts.headers = hdrs;
nc.publish(subject, sc.encode(String(payload)), pubopts);
console.log(`[${i}] ${subject}: ${payload}`);
if (interval) {
await delay(interval);
}
}
await nc.flush();
await nc.close();
})();
sub.js
#!/usr/bin/env node
const parse = require("minimist");
const { connect, StringCodec, credsAuthenticator } = require('nats');
const fs = require("fs");
const argv = parse(
process.argv.slice(2),
{
alias: {
"d": ["debug"],
"s": ["server"],
"q": ["queue"],
"f": ["creds"]
},
default: {
s: "127.0.0.1:4222",
q: "",
},
boolean: ["debug"],
string: ["server", "queue", "creds"]
}
);
const usage = () => {
console.log(
"Usage: nats-sub [-s server] [--creds=/path/file.creds] [-q queue] [-h] subject",
);
}
const opts = { servers: argv.s };
const subject = argv._[0] ? String(argv._[0]) : null;
if (argv.h || argv.help || !subject) {
usage();
process.exit(1);
}
if (argv.d) {
opts.debug = true;
}
if (argv.creds) {
const data = fs.readFileSync(argv.creds);
opts.authenticator = credsAuthenticator(data);
}
(async () => {
let nc;
try {
nc = await connect(opts);
} catch (err) {
console.log(`error connecting to nats: ${err.message}`);
return;
}
console.info(`connected ${nc.getServer()}`);
nc.closed()
.then((err) => {
if (err) {
console.error(`closed with an error: ${err.message}`);
}
});
const sc = StringCodec();
const sub = nc.subscribe(subject, { queue: argv.q });
console.info(`${argv.q !== "" ? "queue " : ""}listening to ${subject}`);
for await (const m of sub) {
let receivedTime = Date.now();
let transportTime = null;
if (m.headers) {
for (const [key, value] of m.headers) {
if (key === 'time') {
transportTime = receivedTime - parseInt(value.toString(), 10);
break;
}
}
}
console.log(`[${sub.getProcessed()}]: ${m.subject}: ${sc.decode(m.data)} after ${(transportTime || -1)} ms`);
}
})();
程序
命令发布:node pub.js -c 50 -i 10 test --debug
命令sub:node sub.js --debug test
版本:2.1.14
运输时间:~2.5ms
**我的问题是:**为什么NATS CLI和NodeJS客户端之间存在如此明显的差异?是因为语言吗?NATS是在Go中原生完成的吗?
1条答案
按热度按时间z9smfwbn1#
简短的回答是Go客户端(
nats
工具中使用的客户端)将快得多(客户端可以使用不同的线程来发送/接收和处理网络流量)在node/和其他JavaScript运行时中,当客户端正在做某事时,它不能做其他事情。这意味着如果它忙碌做i/o,它就不能处理订阅消息等
涉及发布者的测试的I/O可能会受到出站缓冲区大小的影响。默认情况下,这是32 K在JavaScript客户端是一个很好的工作大小(我相信有一个选项,以改变它在测试中,如果没有,我将更新板凳)。当在循环中发布时,一旦出站被填充,它就发送到网络-通常客户端将处理并不做任何事情,并且它在下一个事件循环中刷新。在测试循环的情况下,它填充它,并且必须发送,然后继续。所以这个缓冲区的大小很大程度上影响了网络I/O
在屏幕截图中,您使用的是
debug
。在执行任何性能测试时,也不要将服务器或客户端置于调试模式。请求测试
有几件事你应该尝试:
--asyncRequests
选项。因此,请求被串行处理-第二个请求将不会被发送,直到第一个返回,显然RTT将是可怕的,没有这个选项。--callbacks
选项,这将消除处理中的迭代器,对于普通客户端,您可能应该只使用迭代器。--req
平台是次优的,因为它每个网络执行4次操作(因为它在同一连接上设置订阅者-即,客户端向服务器发送请求,从服务器发送到订阅者,订阅者返回响应,消息发送到服务器,服务器将响应返回给客户端)。如果你除以2,这是一个粗略的近似值。--count
是1 M消息,在每个请求创建一个promise的请求的情况下,一旦promise的数量耗尽,您将很快遇到缓慢的消费者或其他问题。我会保留这个10 K或更少的请求测试。发布测试
节点客户端的原始blast性能:
子测试
指定
--callbacks
选项将提高性能。此外,发布者实现将限制sub的执行速度,因此:[重复上面的NATS长凳]
如果同时执行这两种操作,则数字会略有不同,主要是因为JavaScript的发布或处理