调整apache storm序列化程序的性能

jtjikinw  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(317)

我对java和ApacheStorm还很陌生,我想知道如何让事情发展得更快!我设置了一个storm集群,其中有两台物理机,每台8核。集群运行得非常好。为了测量性能,我设置了以下测试拓扑:

builder.setSpout("spout", new RandomNumberSpoutSingle(sizeOfArray), 10);
builder.setBolt("null", new NullBolt(), 4).allGrouping("spout");

randomnumberspoutsingle创建如下数组:

ArrayList<Integer> array = new ArrayList<Integer>();

我用sizeofarray整数填充它。这个数组与一个id结合,构建了我的元组。现在我用allgrouping测量每秒有多少元组到达bolt(我查看storm gui的“transfered”值)。
如果我把sizeofarray=1024,大约173000个tuples/s被推送。因为1个元组应该是大约4*1024字节,所以移动大约675mb/秒。
到目前为止我说的对吗?现在我的问题是:storm/kryo有能力移动更多吗?我该怎么调?有我忽略的设置吗?我想每秒序列化更多元组!如果我使用本地洗牌,值会猛增,因为不需要序列化任何内容,但我需要所有worker上的元组。cpu、内存和网络都没有被完全占用。

7lrncoxx

7lrncoxx1#

我认为你的计算是对的,但我不确定在序列化过程中是否考虑了非原始整数类型的java开销,这会给等式增加更多字节。然而,我也不确定这是否是分析storm性能的最佳方法,因为这更多的是用每秒元组的数量来衡量,而不是用带宽来衡量。
storm为基本类型、字符串、字节数组、arraylist、hashmap和hashset(源代码)内置了序列化。当我编写java以获得最大的性能时,我会尽可能地使用基本类型。是否可行 int[] 而不是 ArrayList<Integer> ? 我希望能从中获得一些性能,如果这在您的设置是可能的。
考虑到上面提到的storm能够立即序列化的类型,我很可能会避免尝试改进序列化性能。我假设kryo是非常优化的,在这里很难实现更快的目标。我也不确定序列化是否是这里真正的瓶颈,或者更确切地说是拓扑设置中的瓶颈(见下文)。
我将研究与工作人员内部和工作人员之间通信相关的其他可调参数。这里有一个很好的概述。在一个性能至关重要的拓扑中,我使用下面的设置代码来调整这些参数。什么在你的情况下最有效需要通过测试来发现。

int topology_executor_receive_buffer_size = 32768; // intra-worker messaging, default: 32768
int topology_transfer_buffer_size = 2048; // inter-worker messaging, default: 1000
int topology_producer_batch_size = 10; // intra-worker batch, default: 1
int topology_transfer_batch_size = 20; // inter-worker batch, default: 1
int topology_batch_flush_interval_millis = 10; // flush tuple creation ms, default: 1
double topology_stats_sample_rate = 0.001; // calculate metrics every 1000 messages, default: 0.05
conf.put("topology.executor.receive.buffer.size", topology_executor_receive_buffer_size);
conf.put("topology.transfer.buffer.size", topology_transfer_buffer_size);
conf.put("topology.producer.batch.size", topology_producer_batch_size);
conf.put("topology.transfer.batch.size", topology_transfer_batch_size);
conf.put("topology.batch.flush.interval.millis", topology_batch_flush_interval_millis);
conf.put("topology.stats.sample.rate", topology_stats_sample_rate);

正如您所注意到的,当storm能够使用内部工作进程时,性能会大大提高,因此我始终建议尽可能使用它。你确定需要分组吗?如果不是,我建议使用shufflegrouping,如果storm认为合适的话,它实际上会使用本地通信,除非 topology.disable.loadaware.messaging 设置为false。我不确定allgrouping是否会对同一个worker上的那些组件使用本地通信。
我想知道的另一件事是你的拓扑结构:你有10个喷口和4个消费螺栓。除非螺栓消耗传入元组的速度比创建元组的速度快得多,否则建议对两个组件使用相同的数量。从您描述流程的方式来看,似乎您使用了acking和failing,因为您编写了一个代码,为元组分配了一个id。在保证处理单个元组不是绝对要求的情况下,可以通过切换到未编排元组来获得性能。acking和failing确实会产生一些开销,因此如果关闭它,我会假设tuple吞吐量会更高。
最后,您还可以试验挂起元组的最大数量的值(通过方法setmaxspoutpending配置)。不确定storm使用的默认值是什么,但是根据我的经验,设置一个比螺栓可以吸收的下游值高一点的值可以提供更高的吞吐量。查看storm ui中的度量容量和传输的元组数。

相关问题