TensorFlow自定义循环训练模型:多GPU比单GPU慢

von4xj4u  于 2023-01-02  发布在  其他
关注(0)|答案(1)|浏览(224)

我正在使用:

  • tensorflow 2.6
  • CUDA 11.2
  • 4个GPU(GeForce®实时处理器3070)

TensorFlow使用Keras定义训练模型,多个GPU可以正常加速,但使用自定义循环训练模型时,batch_size(多GPU设置过大会导致内存溢出)设置与单GPU相同,模型训练速度比单GPU慢,如何解决?
我在谷歌上搜索了很多,但没有任何令人满意的解决方案。
这是我的一段代码。

with mirrored_strategy.scope():
    model = tf.keras.Model(input_data, bbox_tensors)
    optimizer = tf.keras.optimizers.Adam()
    ckpts = tf.train.Checkpoint(optimizer=optimizer, model=model)

def training(inputs):
    """training part"""
    image_data, labels = inputs
    # split data
    split_image = tf.split(image_data, 4, axis=0)
    split_label = tf.split(labels, 4, axis=0)

    out_split = []
    with tf.GradientTape() as tap:
        for i in range(4):
            predictions = model(split_image[i], training=True)
            tloss = compute_loss(predictions, split_label[i])
            out_split.append(tloss)
    tloss = tf.reduce_sum(tloss)
    gradients = tap.gradient(tloss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return tloss

@tf.function
def distributed_training(dataset_inputs):
    per_replica_losses = mirrored_strategy.run(training, args=(dataset_inputs, ))
    return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

@Jirayu Kaewprateep这是我的数据集生成器。

class Dataset(object):
"""implement Dataset here"""
def __init__(self, dataset_type, model_type='csp'):
    self.annot_path = cfg.TRAIN.ANNOT_PATH if dataset_type == 'train' else cfg.VERIFY.ANNOT_PATH
    self.input_sizes = cfg.TRAIN.INPUT_SIZE if dataset_type == 'train' else cfg.VERIFY.INPUT_SIZE
    self.batch_size = cfg.TRAIN.BATCH_SIZE if dataset_type == 'train' else cfg.VERIFY.BATCH_SIZE
    self.data_aug = cfg.TRAIN.DATA_AUG if dataset_type == 'train' else cfg.VERIFY.DATA_AUG

    self.train_input_sizes = cfg.TRAIN.INPUT_SIZE
    if model_type == 'tiny':
        self.strides = np.array(cfg.YOLO.TINY_STRIDES)
    else:
        self.strides = np.array(cfg.YOLO.STRIDES)
    self.classes = utils.read_class_names(cfg.YOLO.CLASSES)
    self.num_classes = len(self.classes)
    self.anchors = np.array(utils.get_anchors(cfg.YOLO.ANCHORS))
    self.anchor_per_scale = cfg.YOLO.ANCHOR_PER_SCALE
    self.max_bbox_per_scale = 150

    self.annotations = self.load_annotations(dataset_type)
    self.num_samples = len(self.annotations)
    self.num_batchs = int(np.ceil(self.num_samples / self.batch_size))
    self.batch_count = 0
    self.train_input_size = self.train_input_sizes
    self.train_output_sizes = self.train_input_size // self.strides

def load_annotations(self, dataset_type):

    with open(self.annot_path, 'r') as f:
        txt = f.readlines()
        annotations = [line.strip() for line in txt if len(line.strip().split()[1:]) != 0]
    np.random.shuffle(annotations)
    return annotations

def __iter__(self):
    return self

def __next__(self):

    with tf.device('/cpu:0'):
        # self.train_input_size = random.choice(self.train_input_sizes)
        self.train_input_size = self.train_input_sizes
        self.train_output_sizes = self.train_input_size // self.strides

        batch_image = np.zeros((self.batch_size, self.train_input_size, self.train_input_size, 3))

        batch_label_sbbox = np.zeros((self.batch_size, self.train_output_sizes[0], self.train_output_sizes[0],
                                      self.anchor_per_scale, 5 + self.num_classes))
        batch_label_mbbox = np.zeros((self.batch_size, self.train_output_sizes[1], self.train_output_sizes[1],
                                      self.anchor_per_scale, 5 + self.num_classes))
        batch_label_lbbox = np.zeros((self.batch_size, self.train_output_sizes[2], self.train_output_sizes[2],
                                      self.anchor_per_scale, 5 + self.num_classes))

        batch_sbboxes = np.zeros((self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32)
        batch_mbboxes = np.zeros((self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32)
        batch_lbboxes = np.zeros((self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32)

        num = 0
        if self.batch_count < self.num_batchs:
            while num < self.batch_size:
                index = self.batch_count * self.batch_size + num
                if index >= self.num_samples: index -= self.num_samples
                annotation = self.annotations[index]
                image, bboxes = self.parse_annotation(annotation)
                label_sbbox, label_mbbox, label_lbbox, sbboxes, mbboxes, lbboxes = self.preprocess_true_boxes(bboxes)

                batch_image[num, :, :, :] = image
                batch_label_sbbox[num, :, :, :, :] = label_sbbox
                batch_label_mbbox[num, :, :, :, :] = label_mbbox
                batch_label_lbbox[num, :, :, :, :] = label_lbbox
                batch_sbboxes[num, :, :] = sbboxes
                batch_mbboxes[num, :, :] = mbboxes
                batch_lbboxes[num, :, :] = lbboxes
                num += 1
            self.batch_count += 1
            batch_starget = batch_label_sbbox, batch_sbboxes
            batch_mtarget = batch_label_mbbox, batch_mbboxes
            batch_ltarget = batch_label_lbbox, batch_lbboxes
            # print('batch_image_shape: ', batch_image.shape)
            # return batch_image, batch_label_sbbox, batch_label_mbbox, batch_label_lbbox, \
            #        batch_sbboxes, batch_mbboxes, batch_lbboxes
            return (batch_image, (batch_starget, batch_mtarget, batch_ltarget),)
        else:
            self.batch_count = 0
            np.random.shuffle(self.annotations)
            raise StopIteration

@staticmethod
def random_horizontal_flip(image, bboxes):
    if random.random() < 0.5:
        _, w, _ = image.shape
        image = image[:, ::-1, :]
        bboxes[:, [0, 2]] = w - bboxes[:, [2, 0]]

    return image, bboxes

@staticmethod
def random_crop(image, bboxes):
    if random.random() < 0.5:
        h, w, _ = image.shape
        max_bbox = np.concatenate([np.min(bboxes[:, 0:2], axis=0), np.max(bboxes[:, 2:4], axis=0)], axis=-1)

        max_l_trans = max_bbox[0]
        max_u_trans = max_bbox[1]
        max_r_trans = w - max_bbox[2]
        max_d_trans = h - max_bbox[3]

        crop_xmin = max(0, int(max_bbox[0] - random.uniform(0, max_l_trans)))
        crop_ymin = max(0, int(max_bbox[1] - random.uniform(0, max_u_trans)))
        crop_xmax = max(w, int(max_bbox[2] + random.uniform(0, max_r_trans)))
        crop_ymax = max(h, int(max_bbox[3] + random.uniform(0, max_d_trans)))

        image = image[crop_ymin: crop_ymax, crop_xmin: crop_xmax]

        bboxes[:, [0, 2]] = bboxes[:, [0, 2]] - crop_xmin
        bboxes[:, [1, 3]] = bboxes[:, [1, 3]] - crop_ymin

    return image, bboxes

@staticmethod
def random_translate(image, bboxes):
    if random.random() < 0.5:
        h, w, _ = image.shape
        max_bbox = np.concatenate([np.min(bboxes[:, 0:2], axis=0), np.max(bboxes[:, 2:4], axis=0)], axis=-1)

        max_l_trans = max_bbox[0]
        max_u_trans = max_bbox[1]
        max_r_trans = w - max_bbox[2]
        max_d_trans = h - max_bbox[3]

        tx = random.uniform(-(max_l_trans - 1), (max_r_trans - 1))
        ty = random.uniform(-(max_u_trans - 1), (max_d_trans - 1))

        M = np.array([[1, 0, tx], [0, 1, ty]])
        image = cv2.warpAffine(image, M, (w, h))

        bboxes[:, [0, 2]] = bboxes[:, [0, 2]] + tx
        bboxes[:, [1, 3]] = bboxes[:, [1, 3]] + ty

    return image, bboxes

def parse_annotation(self, annotation):
    line = annotation.split()
    # line = annotation.split(' ')
    image_path = line[0]
    if not os.path.exists(image_path):
        raise KeyError("%s does not exist ... " % image_path)
    image = np.array(cv2.imread(image_path))
    bboxes = np.array([list(map(lambda x: int(float(x)), box.split(','))) for box in line[1:]])

    if self.data_aug:
        image, bboxes = self.random_horizontal_flip(np.copy(image), np.copy(bboxes))
        image, bboxes = self.random_crop(np.copy(image), np.copy(bboxes))
        image, bboxes = self.random_translate(np.copy(image), np.copy(bboxes))

    image, bboxes = utils.image_preprocess(np.copy(image), [self.train_input_size, self.train_input_size],
                                           np.copy(bboxes))
    return image, bboxes

@staticmethod
def bbox_iou(boxes1, boxes2):
    boxes1 = np.array(boxes1)
    boxes2 = np.array(boxes2)

    boxes1_area = boxes1[..., 2] * boxes1[..., 3]
    boxes2_area = boxes2[..., 2] * boxes2[..., 3]

    boxes1 = np.concatenate([boxes1[..., :2] - boxes1[..., 2:] * 0.5,
                            boxes1[..., :2] + boxes1[..., 2:] * 0.5], axis=-1)
    boxes2 = np.concatenate([boxes2[..., :2] - boxes2[..., 2:] * 0.5,
                            boxes2[..., :2] + boxes2[..., 2:] * 0.5], axis=-1)

    left_up = np.maximum(boxes1[..., :2], boxes2[..., :2])
    right_down = np.minimum(boxes1[..., 2:], boxes2[..., 2:])

    inter_section = np.maximum(right_down - left_up, 0.0)
    inter_area = inter_section[..., 0] * inter_section[..., 1]
    union_area = boxes1_area + boxes2_area - inter_area

    return inter_area / (union_area + 1e-7)  #

def preprocess_true_boxes(self, bboxes):

    label = [np.zeros((self.train_output_sizes[i], self.train_output_sizes[i], self.anchor_per_scale,
                       5 + self.num_classes)) for i in range(3)]
    bboxes_xywh = [np.zeros((self.max_bbox_per_scale, 4)) for _ in range(3)]
    bbox_count = np.zeros((3,))

    for bbox in bboxes:
        bbox_coor = bbox[:4]
        bbox_class_ind = bbox[4]
        onehot = np.zeros(self.num_classes, dtype=np.float)
        onehot[bbox_class_ind] = 1.0

        uniform_distribution = np.full(self.num_classes, 1.0 / self.num_classes)
        deta = 0.01
        smooth_onehot = onehot * (1 - deta) + deta * uniform_distribution

        bbox_xywh = np.concatenate([(bbox_coor[2:] + bbox_coor[:2]) * 0.5, bbox_coor[2:] - bbox_coor[:2]], axis=-1)
        bbox_xywh_scaled = 1.0 * bbox_xywh[np.newaxis, :] / self.strides[:, np.newaxis]

        iou = []
        exist_positive = False
        for i in range(3):
            anchors_xywh = np.zeros((self.anchor_per_scale, 4))
            anchors_xywh[:, 0:2] = np.floor(bbox_xywh_scaled[i, 0:2]).astype(np.int32) + 0.5
            anchors_xywh[:, 2:4] = self.anchors[i]

            iou_scale = self.bbox_iou(bbox_xywh_scaled[i][np.newaxis, :], anchors_xywh)
            iou.append(iou_scale)
            iou_mask = iou_scale > 0.3

            if np.any(iou_mask):
                xind, yind = np.floor(bbox_xywh_scaled[i, 0:2]).astype(np.int32)

                label[i][yind, xind, iou_mask, :] = 0
                label[i][yind, xind, iou_mask, 0:4] = bbox_xywh
                label[i][yind, xind, iou_mask, 4:5] = 1.0
                label[i][yind, xind, iou_mask, 5:] = smooth_onehot

                bbox_ind = int(bbox_count[i] % self.max_bbox_per_scale)
                bboxes_xywh[i][bbox_ind, :4] = bbox_xywh
                bbox_count[i] += 1

                exist_positive = True

        if not exist_positive:
            best_anchor_ind = np.argmax(np.array(iou).reshape(-1), axis=-1)
            best_detect = int(best_anchor_ind / self.anchor_per_scale)
            best_anchor = int(best_anchor_ind % self.anchor_per_scale)
            xind, yind = np.floor(bbox_xywh_scaled[best_detect, 0:2]).astype(np.int32)

            label[best_detect][yind, xind, best_anchor, :] = 0
            label[best_detect][yind, xind, best_anchor, 0:4] = bbox_xywh
            label[best_detect][yind, xind, best_anchor, 4:5] = 1.0
            label[best_detect][yind, xind, best_anchor, 5:] = smooth_onehot

            bbox_ind = int(bbox_count[best_detect] % self.max_bbox_per_scale)
            bboxes_xywh[best_detect][bbox_ind, :4] = bbox_xywh
            bbox_count[best_detect] += 1
    label_sbbox, label_mbbox, label_lbbox = label  # label_xbbox--> shape (52, 52, 3, 9)
    sbboxes, mbboxes, lbboxes = bboxes_xywh  # bboxes--> shape (150, 4)

    return label_sbbox, label_mbbox, label_lbbox, sbboxes, mbboxes, lbboxes

def __len__(self):
    return self.num_batchs
6yjfywim

6yjfywim1#

可能性有很多:
1.输入数据集。多个单元跨设备目标工作,具有各自的处理速度和来自分发服务器的问题。它们具有同步和异步模式。您应用了config = tf.config.experimental.set_synchronous_execution(False)
1.自定义循环模式的意思是执行保证模式,您需要用程序而不是model.fit()或估计器函数来处理这个过程。
1.输入示例中看到的数据和标签。您需要自己处理数据输入,甚至使用estimator()。
Distribution training
示例:使用TensorFlow Keras的简单应用程序model.fit()。数据集需要注意处理。

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

physical_devices = tf.config.experimental.list_physical_devices('CPU')
default_strategy = tf.distribute.get_strategy()
print( default_strategy )

config = tf.config.experimental.set_synchronous_execution( False )
print( tf.config.experimental.get_synchronous_execution() )

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
    model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')

dataset = tf.data.Dataset.from_tensor_slices((tf.constant([1, 2, 3, 4], shape=(1, 4)), tf.constant([1], shape=(1, 1))))
history = model.fit( dataset, epochs=10000 )

input('...')

输出:使用共享策略,您可以对60%、40%或80%的工作量进行估计20%或回退0

2022-12-07 13:21:30.873778: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy
as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_INT32
      type: DT_INT32
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT32
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT32
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT32
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT32
        }
      }
    }
  }
}

Epoch 1/10000
1/1 [==============================] - 2s 2s/step - loss: 5.8720
Epoch 2/10000
1/1 [==============================] - 0s 8ms/step - loss: 4.1545
Epoch 3/10000
1/1 [==============================] - 0s 8ms/step - loss: 2.9623
Epoch 4/10000
1/1 [==============================] - 0s 8ms/step - loss: 2.1346
Epoch 5/10000
1/1 [==============================] - 0s 8ms/step - loss: 1.5598

steps: 156
x_value -1.3065915
y_value -0.23498479
v 123839330.0
steps: 157
x_value 1.1961238
y_value -0.055203147
v 123832690.0
steps: 158
x_value -0.04365039
y_value 0.4533396
v 123826070.0
steps: 159
x_value 0.0
y_value 0.15724461
v 123819460.0

示例:在单点坐标值求取中的应用.

step: 000004 action: 6 coff_0: -00002 coff_1: -00001 coff_2: 000015 coff_3: 000223 coff_4: 000089 epsilon: False
step: 000005 action: 6 coff_0: 000000 coff_1: 000004 coff_2: 000020 coff_3: 000218 coff_4: 000085 epsilon: False
step: 000006 action: 1 coff_0: 000002 coff_1: 000008 coff_2: 000024 coff_3: 000214 coff_4: 000081 epsilon: False
step: 000007 action: 6 coff_0: 000004 coff_1: 000011 coff_2: 000027 coff_3: 000211 coff_4: 000077 epsilon: False
step: 000008 action: 1 coff_0: 000006 coff_1: 000013 coff_2: 000029 coff_3: 000209 coff_4: 000073 epsilon: False
step: 000009 action: 6 coff_0: 000008 coff_1: 000014 coff_2: 000030 coff_3: 000208 coff_4: 000069 epsilon: False
step: 000010 action: 1 coff_0: 000010 coff_1: 000014 coff_2: 000030 coff_3: 000208 coff_4: 000065 epsilon: False
step: 000011 action: 6 coff_0: 000012 coff_1: 000013 coff_2: 000029 coff_3: 000209 coff_4: 000061 epsilon: False

相关问题