Tensorflow 2-将“tf.data.experimental.make_csv_dataset”与“tf.keras.预处理.时间序列_dataset_from_array”配合使用

vxqlmq5t  于 2022-11-13  发布在  其他
关注(0)|答案(1)|浏览(136)

我正在尝试让TensorFlow读取+100个CSV文件,这些文件***不***适合内存(每个文件+1GB大小)。这些文件包含时间序列数据(EEG信号),标签位于第一列。从TensorFlow文档来看,我应该能够使用 tf.data API从磁盘加载我的数据。
为了简单和重现性,让我们考虑以下“sample_data.csv”数据集:
| 标签|功能1|功能2|
| - -|- -|- -|
| 苹果公司|一个|2个|
| 香蕉色|三个|四个|
| 椰子色|五个|六个|
| 榴莲|七个|八个|
我尝试过使用tf.data.experimental.make_csv_dataset将CSV文件加载到 tf.data.Dataset 对象中,然后使用tf.keras.preprocessing.timeseries_dataset_from_array将数据处理到具有重叠的滑动窗口中。

import tensorflow as tf

input_data = tf.data.experimental.make_csv_dataset(
    'sample_data.csv',
    batch_size=1,
    column_names=['Label', 'Feature 1', 'Feature 2']
    label_name='Label',
    num_epochs=1,
    shuffle=False
)

我们可以通过查看list(input_data.as_numpy_iterator())的输出来检查input_data是否正常工作。然后我们可以将input_data输入到下一个函数:

my_dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
    input_data,
    targets=None,
    sequence_length=3,
    sequence_stride=2,
    sampling_rate=1,  
    batch_size=1,
    shuffle=False
)

不幸的是,会抛出此错误
TypeError:数据集长度未知。
我还尝试使用my_dataset = input_data.window(3, shift=2)(请参阅tf.data.Dataset.window文档),它没有抛出错误,但似乎返回了一个空数据集?请参阅“_VariantDataset shape:(无,)”在输出中:

list(input_data.window(3, shift=2))

[344]:
[(OrderedDict([('Feature 1',
                <_VariantDataset shapes: (None,), types: tf.int32>),
               ('Feature 2',
                <_VariantDataset shapes: (None,), types: tf.int32>)]),
  <_VariantDataset shapes: (None,), types: tf.string>),
 (OrderedDict([('Feature 1',
                <_VariantDataset shapes: (None,), types: tf.int32>),
               ('Feature 2',
                <_VariantDataset shapes: (None,), types: tf.int32>)]),
  <_VariantDataset shapes: (None,), types: tf.string>),
 (OrderedDict([('Feature 1',
                <_VariantDataset shapes: (None,), types: tf.int32>),
               ('Feature 2',
                <_VariantDataset shapes: (None,), types: tf.int32>)]),
  <_VariantDataset shapes: (None,), types: tf.string>)]

如果我使用panda将“sample_data.csv”加载到内存中,然后将numpy数组作为 timeseries_dataset_from_array 函数的输入,则它可以正常工作。

将内存外时间序列数据中的重叠窗口输入TensorFlow的最佳方法是什么

谢谢你,谢谢你

tnkciper

tnkciper1#

代码可以工作,但速度慢,GPU未充分利用。有额外的列,以防止窗口包含两个不同的文件,这对于许多问题可能会被删除。有3列feature1,feature2,feature3。代码返回feature1和feature2的窗口和来自feature3的单点,但可以很容易地调整。它是根据feature1和feature2预测feature3的值。

def pack_features_vector(features):
    features = tf.stack([tf.cast(x,tf.float32) for x in list(features.values())], axis=1)
    return features

def read_dataset(file_path, min_array, max_array, shuffle_enabled=False, batch_size = 512, window_size=50, for_forecast=False,make_sequences=True):
    
    if min_array is None or max_array is None:
        scaling_on = False
    else:
        scaling_on = True
    
    
    if scaling_on:
        # This is required for Normalization X-Min_X/(Max_X-Min_X) the first element is unique file id that we don't need to normalize as it
        # is use for filtering and removing sequences that have mixed file ids so we do not cross data from different units
        max_np = np.insert(max_array,0,1)
        min_np = np.insert(min_array,0,0)
        min_list = list(min_np)

        denominator_list = list(max_np-min_np) # Need list that will be used to divide element wise for Normalization
    
    # Shuffle files
    if shuffle_enabled:
        shuffle(file_path)
    
    # Read csv files (cannot use parallel as it would mix samples, we can shuffle
    building_dataset = tf.data.experimental.make_csv_dataset(file_pattern=file_path,
                                                        batch_size=1024,num_epochs=1, shuffle=False,
                                                        select_columns=['file_id','feature1','feature2','feature3'],
                                                        num_parallel_reads=1)
    
   
    building_dataset = building_dataset.map(pack_features_vector)
    building_dataset = building_dataset.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x))

    if scaling_on:
        building_dataset = building_dataset.map(lambda x: (x-min_list)/denominator_list)#/(np.array[1,105,1005,10005]-np.array[0,100,1000,10000]))
     
    # Make overlapping sequences and filter some that go over different files
    if make_sequences:
        building_dataset = building_dataset.window(window_size, shift=1, drop_remainder=True)
        building_dataset = building_dataset.flat_map(lambda window: window.batch(window_size))

        building_dataset = building_dataset.filter(lambda window: tf.reduce_sum(window[:-1,0]-window[1:,0])==0) # Check if windows is for single file_id (no crossing allowed) (column 0)

        
        if not for_forecast:
            building_dataset = building_dataset.map(lambda window: (window[:,1:-1], window[-((window_size)//2),-1]))   #building_dataset = building_dataset.map(lambda window: (window[:,:-1], window[-1:,-1]))
        else:
            building_dataset = building_dataset.map(lambda window: window[:,1:-1])
    else:
        building_dataset = building_dataset.map(lambda x: x[1:])

    if shuffle_enabled:
        building_dataset = building_dataset.shuffle(1000)
    
    building_dataset = building_dataset.batch(batch_size)
    
    building_dataset = building_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    
    return building_dataset

相关问题