NodeJS的NATS客户端非常慢

3qpi33ja  于 2023-06-05  发布在  Node.js
关注(0)|答案(1)|浏览(315)

我用NATS基准测试工具执行了一个延迟测试,使用了两种类型的NATS客户机。
A)本机NATS客户端(NATS CLI),可从https://github.com/nats-io/natscli/releases下载安装
程序
生产者命令:nats bench foo --pub 1 --request --msgs 100000 --size 3kb
消费者命令:nats bench foo --sub 1 --reply
版本:0.0.35
结果:~6600 Msg/s
延迟:1/(2*6600) = 0.075 ms

B)然后我使用了NATS客户端for NodeJS,编写了一个脚本,灵感来自NATS NodeJS Github存储库中提供的内容,以执行与原生NATS客户端相同的基准测试。
bench.js

#!/usr/bin/env node

const parse = require("minimist");
const {Nuid, connect, Bench, Metric} = require('nats')

const defaults = {
  s: "127.0.0.1:4222",
  c: 100000,
  p: 128,
  subject: new Nuid().next(),
  i: 1,
  json: false,
  csv: false,
  csvheader: false,
  pending: 1024 * 32,
};

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "s": ["server"],
      "c": ["count"],
      "d": ["debug"],
      "p": ["payload"],
      "i": ["iterations"],
    },
    default: defaults,
    string: [
      "subject",
    ],
    boolean: [
      "asyncRequests",
      "callbacks",
      "json",
      "csv",
      "csvheader",
    ],
  },
);

if (argv.h || argv.help || (!argv.sub && !argv.pub && !argv.req)) {
  console.log(
    "usage: bench.ts [--json] [--callbacks] [--csv] [--csvheader] [--pub] [--sub] [--req (--asyncRequests)] [--count <#messages>=100000] [--payload <#bytes>=128] [--iterations <#loop>=1>] [--server server] [--subject <subj>]\n",
  );
  process.exit(0);
}
const server = argv.server;
const count = parseInt(argv.count);
const bytes = parseInt(argv.payload);
const iters = parseInt(argv.iterations);
const pl = parseInt(argv.pendingLimit) * 1024;
const metrics = [];

(async () => {
  for (let i = 0; i < iters; i++) {
    const nc = await connect(
      { servers: server, debug: argv.debug, pending: argv.pending },
    );
    const opts = {
      msgs: count,
      size: bytes,
      asyncRequests: argv.asyncRequests,
      callbacks: argv.callbacks,
      pub: argv.pub,
      sub: argv.sub,
      req: argv.req,
      rep: argv.rep,
      subject: argv.subject,
    };

    const bench = new Bench(nc, opts);
    const m = await bench.run();
    metrics.push(...m);
    await nc.close();
  }
})().then(() => {
  const reducer = (a, m) => {
    if (a) {
      a.name = m.name;
      a.payload = m.payload;
      a.bytes += m.bytes;
      a.duration += m.duration;
      a.msgs += m.msgs;
      a.lang = m.lang;
      a.version = m.version;
      a.async = m.async;

      a.max = Math.max(a.max === undefined ? 0 : a.max, m.duration);
      a.min = Math.min(a.min === undefined ? m.duration : a.max, m.duration);
    }
    return a;
  };

  if (!argv.json && !argv.csv) {
    const pubsub = metrics.filter((m) => m.name === "pubsub").reduce(
      reducer,
      new Metric("pubsub", 0),
    );
    const pub = metrics.filter((m) => m.name === "pub").reduce(
      reducer,
      new Metric("pub", 0),
    );
    const sub = metrics.filter((m) => m.name === "sub").reduce(
      reducer,
      new Metric("sub", 0),
    );
    const req = metrics.filter((m) => m.name === "req").reduce(
      reducer,
      new Metric("req", 0),
    );
    const rep = metrics.filter((m) => m.name === "rep").reduce(
      reducer,
      new Metric("rep", 0),
    );

    if (pubsub && pubsub.msgs) {
      console.log(pubsub.toString());
    }
    if (pub && pub.msgs) {
      console.log(pub.toString());
    }
    if (sub && sub.msgs) {
      console.log(sub.toString());
    }
    if (req && req.msgs) {
      console.log(req.toString());
    }
    if (rep && rep.msgs) {
      console.log(rep.toString());
    }
  } else if (argv.json) {
    console.log(JSON.stringify(metrics, null, 2));
  } else if (argv.csv) {
    const lines = metrics.map((m) => {
      return m.toCsv();
    });
    if (argv.csvheader) {
      lines.unshift(Metric.header());
    }
    console.log(lines.join(""));
  }
});

程序
命令:node bench.js --subject test --req --payload 3500 --count 100000
NodeJS客户端版本:2.1.14
结果:~330 Msg/s
延迟:1/(2*330) = 1.5 ms

C)此外,我尝试创建两个客户端,一个发送消息的发布者和一个消费消息的订阅者,来测量有问题的消息的传输时间,而不需要通过基准测试工具。结果与基准测试工具bench.js计算的延迟时间的顺序相同。
pub.js

#!/usr/bin/env node

const parse = require("minimist");
const { connect, StringCodec, headers, credsAuthenticator } = require("nats");
const { delay } = require("./utils.js");
const fs = require("fs");

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "s": ["server"],
      "c": ["count"],
      "i": ["interval"],
      "f": ["creds"],
      "h": ["headers"]
    },
    default: {
      s: "127.0.0.1:4222",
      c: 1,
      i: 0,
    },
    boolean: true,
    string: ["server", "count", "interval", "headers", "creds"],
  },
);

const opts = { servers: argv.s };
const subject = String(argv._[0]);
const payload = argv._[1] || "";
const count = (argv.c === -1 ? Number.MAX_SAFE_INTEGER : argv.c) || 1;
const interval = argv.i || 0;

if (argv.h || argv.help || !subject) {
    console.log(
      "Usage: nats-pub [--creds=/path/file.creds] [-s server] [-c <count>=1] [-i <interval>=0] [-h='k=v;k2=v2'] subject [msg]",
    );
    console.log("to publish forever, specify -c=-1 or --count=-1");
    process.exit(1);
  }

if (argv.debug) {
  opts.debug = true;
}

if (argv.creds) {
  const data = fs.readFileSync(argv.creds);
  opts.authenticator = credsAuthenticator(data);
}

(async () => {
  let nc;
  try {
    nc = await connect(opts);
  } catch (err) {
    console.log(`error connecting to nats: ${err.message}`);
    return;
  }
  console.info(`connected ${nc.getServer()}`);

  nc.closed()
    .then((err) => {
      if (err) {
        console.error(`closed with an error: ${err.message}`);
      }
    });

  const hdrs = headers();
  if (argv.headers) {
    argv.headers.split(";").map((l) => {
      const [k, v] = l.split("=");
      hdrs.append(k, v);
    });
  }
  hdrs.append("time", "0");

  const sc = StringCodec();

  for (let i = 1; i <= count; i++) {
    const pubopts = {};
    hdrs.set("time", Date.now().toString());
    pubopts.headers = hdrs;
    nc.publish(subject, sc.encode(String(payload)), pubopts);
    console.log(`[${i}] ${subject}: ${payload}`);
    if (interval) {
      await delay(interval);
    }
  }
  await nc.flush();
  await nc.close();
})();

sub.js

#!/usr/bin/env node

const parse = require("minimist");
const { connect, StringCodec, credsAuthenticator } = require('nats');
const fs = require("fs");

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "d": ["debug"],
      "s": ["server"],
      "q": ["queue"],
      "f": ["creds"]
    },
    default: {
      s: "127.0.0.1:4222",
      q: "",
    },
    boolean: ["debug"],
    string: ["server", "queue", "creds"]
  }
);

const usage = () => {
    console.log(
        "Usage: nats-sub [-s server] [--creds=/path/file.creds] [-q queue] [-h] subject",
    );
}

const opts = { servers: argv.s };
const subject = argv._[0] ? String(argv._[0]) : null;

if (argv.h || argv.help || !subject) {
usage();
process.exit(1);
}
  
if (argv.d) {
  opts.debug = true;
}

if (argv.creds) {
  const data = fs.readFileSync(argv.creds);
  opts.authenticator = credsAuthenticator(data);
}

(async () => {
  let nc;
  try {
    nc = await connect(opts);
  } catch (err) {
    console.log(`error connecting to nats: ${err.message}`);
    return;
  }

  console.info(`connected ${nc.getServer()}`);
  nc.closed()
    .then((err) => {
        if (err) {
            console.error(`closed with an error: ${err.message}`);
        }
    });

  const sc = StringCodec();
  const sub = nc.subscribe(subject, { queue: argv.q });
  console.info(`${argv.q !== "" ? "queue " : ""}listening to ${subject}`);
  for await (const m of sub) {
    let receivedTime = Date.now();
    let transportTime = null;
    if (m.headers) {
        for (const [key, value] of m.headers) {
            if (key === 'time') {
                transportTime = receivedTime - parseInt(value.toString(), 10);
                break;
            }
        }
    }
    console.log(`[${sub.getProcessed()}]: ${m.subject}: ${sc.decode(m.data)} after ${(transportTime || -1)} ms`);
  }
})();

程序
命令发布:node pub.js -c 50 -i 10 test --debug
命令sub:node sub.js --debug test
版本:2.1.14
运输时间:~2.5ms

**我的问题是:**为什么NATS CLI和NodeJS客户端之间存在如此明显的差异?是因为语言吗?NATS是在Go中原生完成的吗?

z9smfwbn

z9smfwbn1#

简短的回答是Go客户端(nats工具中使用的客户端)将快得多(客户端可以使用不同的线程来发送/接收和处理网络流量)
在node/和其他JavaScript运行时中,当客户端正在做某事时,它不能做其他事情。这意味着如果它忙碌做i/o,它就不能处理订阅消息等
涉及发布者的测试的I/O可能会受到出站缓冲区大小的影响。默认情况下,这是32 K在JavaScript客户端是一个很好的工作大小(我相信有一个选项,以改变它在测试中,如果没有,我将更新板凳)。当在循环中发布时,一旦出站被填充,它就发送到网络-通常客户端将处理并不做任何事情,并且它在下一个事件循环中刷新。在测试循环的情况下,它填充它,并且必须发送,然后继续。所以这个缓冲区的大小很大程度上影响了网络I/O
在屏幕截图中,您使用的是debug。在执行任何性能测试时,也不要将服务器或客户端置于调试模式。

请求测试

有几件事你应该尝试:

  • 对于请求,您没有使用--asyncRequests选项。因此,请求被串行处理-第二个请求将不会被发送,直到第一个返回,显然RTT将是可怕的,没有这个选项。
  • 当运行异步迭代器进行循环时,测试中的订阅服务器的性能将不会很好。对于测试,您应该使用--callbacks选项,这将消除处理中的迭代器,对于普通客户端,您可能应该只使用迭代器。
  • --req平台是次优的,因为它每个网络执行4次操作(因为它在同一连接上设置订阅者-即,客户端向服务器发送请求,从服务器发送到订阅者,订阅者返回响应,消息发送到服务器,服务器将响应返回给客户端)。如果你除以2,这是一个粗略的近似值。
  • 默认情况下,--count是1 M消息,在每个请求创建一个promise的请求的情况下,一旦promise的数量耗尽,您将很快遇到缓慢的消费者或其他问题。我会保留这个10 K或更少的请求测试。
node examples/bench.js --subject a --req --payload 3500 --count 10000
req 791 msgs/sec - [25.30 secs] ~ 2.64 MB/sec 25295/25295

node examples/bench.js --subject a --req --payload 3500 --count 10000 --asyncRequests
req 36,430 msgs/sec - [0.55 secs] ~ 243.20 MB/sec 549/549

node examples/bench.js --subject a --req --payload 3500 --count 10000 --asyncRequests --callbacks
req 38,462 msgs/sec - [0.52 secs] ~ 256.76 MB/sec 520/520

发布测试

节点客户端的原始blast性能:

node examples/bench.js --subject a --pub --payload 0
pub 917,431 msgs/sec - [0.11 secs] ~ 0.00 B/sec 109/109

# If you actually add a payload it will take longer (after all you are moving 3.5gb):

node examples/bench.js --subject a --pub --payload 3500
pub 290,698 msgs/sec - [0.34 secs] ~ 970.31 MB/sec 344/344

子测试

node examples/bench.js --subject a --sub
sub 294,118 msgs/sec - [0.34 secs] ~ 981.72 MB/sec 340/340

# and in a different terminal - this will possibly 
nats bench --pub 1 --msgs 1000000 --size 3500 -s localhost:4222 a
11:48:16 Starting Core NATS pub/sub benchmark [subject=a, multisubject=false, multisubjectmax=0, msgs=1,000,000, msgsize=3.4 KiB, pubs=1, subs=0, pubsleep=0s, subsleep=0s]
11:48:16 Starting publisher, publishing 1,000,000 messages
Finished      1s

Pub stats: 608,014 msgs/sec ~ 1.98 GB/sec

指定--callbacks选项将提高性能。此外,发布者实现将限制sub的执行速度,因此:

node examples/bench.js --subject a --sub --callbacks
sub 370,370 msgs/sec - [0.27 secs] ~ 1.21 GB/sec 270/270

[重复上面的NATS长凳]
如果同时执行这两种操作,则数字会略有不同,主要是因为JavaScript的发布或处理

node examples/bench.js --subject a --sub --callbacks --pub
pubsub 930,233 msgs/sec - [0.21 secs] ~ 113.55 MB/sec 215/215
pub 462,963 msgs/sec - [0.22 secs] ~ 56.51 MB/sec 216/216
sub 952,381 msgs/sec - [0.10 secs] ~ 116.26 MB/sec 105/105

相关问题