加速风暴拓扑

vql8enpb  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(360)

我们有一个将csv文件从hdfs移动到hive的应用程序。我们在这个过程中使用了风暴拓扑。
8台机器已投入使用。每个处理器都有22个内核和512GB内存。但是,我们的代码运行非常慢。传输600万个数据需要10分钟。
10 mb的60个文件正在一秒钟内传输到hdfs。我们试图优化我们的代码,但很明显,我们正在做一些非常错误的事情。
对于hive表,我们有64个桶。
在我们的拓扑结构中,我们有1个喷口和2个螺栓。基本上,我们的spout获取csv文件,向第一个bolt发送行,第一个bolt负责解析数据,然后bolt向第二个bolt发送行,第二个bolt负责hdfs进程。
hdfs喷口;

HdfsSpout hdfsSpout = new HdfsSpout()
    .withOutputFields(TextFileReader.defaultFields)
    .setReaderType("text")
    .setHdfsUri(hdfsUri)
    .setSourceDir("/data/in")
    .setArchiveDir("/data/done")
    .setBadFilesDir("/data/bad")
    .setClocksInSync(true) // NTP installed on all hosts
    .setIgnoreSuffix("_COPYING_") 
// do not begin reading file until it is completely copied to HDFS
    .setMaxOutstanding(50_000);

制图员;

DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
    .withColumnFields(new Fields(TTDPIRecord.fieldsList))
    .withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));

配置单元选项;

HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
    .withAutoCreatePartitions(true)
    .withHeartBeatInterval(3)
    .withCallTimeout(10_000) // default = 10.000
    .withTxnsPerBatch(2)
    .withBatchSize(50_000) 
// doing below because its affecting storm metrics most likely
    .withTickTupleInterval(1);

配置;

Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

拓扑生成器;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");

我们不确定以下参数;
在hdfs喷口中。setmaxoutstanding(5万欧元);
Hive内喷口选项。带TXNSperbatch(2)。带BatchSize(50\u 000)。带TickTupleInterval(1);
在config;中。员工(6人)。塞努麦克斯(6);
喷口和螺栓的平行度;我们给每人8英镑。
这些参数的值应该是多少?提前谢谢。
编辑;这是我们对100个csv文件的10mb的测试结果;
hdfsspout执行器:8个完整延迟:1834.209毫秒
recordparserbolt执行器:8个完整延迟:0.019毫秒
hivebolt执行器:8个完整延迟:1092.624毫秒

dfddblmv

dfddblmv1#

你在做什么 conf.setNumWorkers(6); 也就是说,您只使用8台机器中的6台,您可以将其设置为8以使用所有硬件。
您可以更改的另一个参数是螺栓的并行性提示,这意味着组件的执行器(线程)的初始数目。如果只将并行度设置为8,则可以将其增加到100/200,并查看性能如何变化。
您可以通过这个来了解并行性在storm中是如何工作的。
你也能告诉什么是你的配置最大喷口挂起?

相关问题