tensorflow 将“tf.data.dataset”中的数据分发给多个工作进程(例如Horovod)

np8igboo  于 2022-12-27  发布在  其他
关注(0)|答案(2)|浏览(163)

使用Horovod,您基本上运行N个独立的示例(因此这是一种“图间复制”形式),它们通过特殊的Horovod操作(基本上是广播+减少)进行通信。
现在假设示例0或其他外部示例加载了数据(通过tf.data.Dataset),那么如何将iterator.get_next()分发到每个示例呢?使用Horovod广播将效率低下,因为您需要将所有数据复制到所有示例。
在每个示例中都有数据集,并在其中执行所有加载,然后在数据集上使用shard也会效率低下,因为您将在所有地方加载数据,然后丢弃(N-1)/N。这就是为什么您也不希望进行分片,而是仅在单个(生产者/数据集工作者)示例,然后该示例将批分发给所有列车工作者。
我猜TF MultiDeviceIterator提供了一些类似的功能(或者基本上就是这样),但我不确定它是否能与Horovod一起工作,以及如何设置它?
或者您可以通过TF工作者(guide)进行分发(您也可以这样配置MultiDeviceIterator)。
如有可能,应通过TensorFlow操作/函数执行此操作(有很多相关功能可能已经给予了这些功能,但我可能不了解它们,或者误解了它的工作原理)。或者答案是TensorFlow还没有提供任何此类功能?(了解这一点还是很有用的。然后,我会用C++实现自己的解决方案,并将其 Package 为TensorFlow操作。但在此之前,最好了解一下这是否真的有必要。
(与之相关的还有this Horovod issue。)
(This这个问题实际上比Horovod更一般,尽管Horovod可能是一个很好的例子。您可能在分布式TensorFlow中总是遇到这个问题?)
(我收集了所有分布式TensorFlow术语和方面here的概述,主要是为了澄清。
(相关的问题(也许?)还有thisthisthisthisthis。)

wwtsj6pe

wwtsj6pe1#

正如您所说,复制每个示例中的数据并对每个示例的数据进行分片是不切实际的。
一种解决方案是将数据流程中的数据分开,并让每个示例从数据流程中提取数据,如下图所示。例如,可以使用队列建立此通信。
在这样的系统中,数据处理将加载数据集,将其预处理为批处理,并将批处理推入队列。然后,每个训练示例将从该队列中提取批处理。例如,您可以将队列作为生成器传递到数据集API中(请参阅tf.data.Dataset.from_generator)。此外,如果批处理的生成速度不够快,则可以创建更多的数据进程来提高批处理吞吐量。
具体的实现细节将因使用情况而异。有关详细信息,可以查找Networking and Interprocess communication和多处理管道和队列。

Training        
                                                         +--------------+  ++
                                                         |              |   |
                                                    +----+  Instance 1  |   |
                                                    |    |              |   |
                                                    |    +--------------+   |
                                                    |                       |
                      Preprocessing                 |                       |
                  +--------------------+            +---->      X           |
                  |                    |            |                       |
             Load |                    | Batches    +           X           |
    Dataset+------>    Data Process    +--------->Queue                     |  N instances
                  |                    |            +           X           |  Distributed training
                  |                    |            |                       |  For example, using
                  +--------------------+            +---->      X           |  Horovod broadcast + reduce
                                                    |                       |
                                                    |        Training       |
                                                    |    +--------------+   |
                                                    |    |              |   |
                                                    +----+  Instance N  |   |
                                                         |              |   |
                                                         +--------------+  ++

对于tensorflow 实现,可以将tf.data.Dataset.shardtf.data.TFRecordDataset一起使用。
该文档解决了您对使用TFRecords时效率低下的担忧:
重要警告:

  • 确保在使用任何随机化操作符(如随机播放)之前进行分片。
  • 通常,最好在数据集管道的早期使用shard运算符。例如,从一组TFRecord文件中阅读时,请在将数据集转换为输入样本之前进行shard。这样可以避免读取每个工作进程上的每个文件。下面是完整管道中有效的分片策略示例:
d = Dataset.list_files(pattern)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
bwleehnv

bwleehnv2#

我想重新看一下YogaDL,它允许缓存数据集,这样在训练(或重新训练)期间,您将只访问该碎片上需要的数据,而不是丢弃(N-1)/N个数据读取。

相关问题