恢复巨大的Pandas Dataframe抛出ArrayMemoryError

jdg4fx2g  于 2023-11-15  发布在  其他
关注(0)|答案(1)|浏览(133)

在工作中,我们有一台机器,它记录一些传感器值(例如温度,压力,速度等)。我有一个csv文件,其中包含这些值,大约在600万到700万行和13列之间。然而,记录器不会在每行中写入每个传感器。只有第一行和最后一行有每个传感器的值。中间的行确实有很多NaN,行之间的时间步长不是恒定的。这里有几行给予给你一个印象:

  1. 2017-04-01 00:00:00.000,56.5611838935852,448,80.12215897219313,448,37.12826156616211,448,19.152511596679688,448,6516.885666666667,448,2.9019691567718984,448
  2. 2017-04-01 00:00:00.343,,,,,,,,,6517,704,,
  3. 2017-04-01 00:00:00.687,,,,,,,,,,,2.9009628295898438,704
  4. 2017-04-01 00:00:02.670,,,,,,,19.152511596679688,192,,,,
  5. 2017-04-01 00:00:03.343,,,,,,,,,6524,192,,
  6. 2017-04-01 00:00:03.453,56.55547332763672,192,,,,,,,,,,

字符串
我们希望使用这些值来输入动态模拟以检查是否一切正常。我们使用的工具需要恒定的时间步长,并且在每一步中,所有值都必须作为输入。
所以为了解决这个问题,我把CSV读成了一个pandas帧,重采样到1ms(因为csv中的时间戳具有毫秒级的精度),对缺失的值进行插值,然后重新采样为1s,这是我们动态模拟所需的时间步长,并保持较低的内存和磁盘空间需求。毫不奇怪,有了这么多的数据,我已经在重新加载到1ms的步骤中耗尽了内存。
我确实阅读了pandas用户指南中关于Scaling to large datasets的内容,并取得了一些成功。我加载了更少的列,使用了高效的数据库,并使用了分块。只有使用分块,我才能处理整个子帧,但花了我大约1.5个小时,这在生产环境中是不可接受的。由于块是在for循环中处理的,处理是串行的。我读了关于Dask的部分,认为并行化可以加快处理速度,但不知何故,我又遇到了内存问题。
以下是我尝试的:

  1. raw_df = pandas_df
  2. upsampled_df = raw_df.resample("1ms").asfreq()
  3. dask_df = dask.dataframe.from_pandas(upsampled_df, chunksize=chunk_size)
  4. interpolated_df = dask_df.map_partitions(lambda df: df.interpolate()).compute()
  5. downsampled_df = interpolated_df.resample("1s").asfreq()


下面是upsampled_df = raw_df.resample("1ms").asfreq()抛出的错误消息:
numpy.core._exceptions._ArrayMemoryError:无法为具有形状(7862399001,)和数据类型int64的数组分配58.6 GiB
问题是(我认为)我仍然必须使用pandas来重采样该对象,因为Dask没有提供任何等价于asfreq()的函数来返回重采样的对象。dask_df.resample("1ms")可以工作,但返回一个Resampler对象,我不能在之后插值。
你会如何处理这个问题?我知道,特别是在科学它的相当常见,有巨大的点阵需要处理,但我真的找不到任何有用的在互联网上,解决我的问题。
请记住,我既不是Pandas,也不是pythonMaven,所以很有可能我错过了一些明显的东西。

pkbketx9

pkbketx91#

重采样是基于时间的groupby,所以在开始时不需要扩展帧,可以使用apply按块(频率)处理帧
如果我理解正确的话,如果我们将数据按天划分,
例如,开始日期范围为9天

  1. import pandas as pd
  2. index = pd.date_range('1/1/2000', periods=9, freq='D')
  3. series = pd.Series(range(9), index=index)

字符串
在每天的块上应用实际的UDF操作。对于每个块(天)。它将重新采样到1ms(lambda函数)

  1. series.resample('D').apply(lambda x: x.resample('1ms').asfreq().interpolate().resample('1s').asfreq())


或30分钟

  1. series.resample('30T').apply(lambda x: x.resample('1ms').asfreq().interpolate().resample('1s').asfreq())


当我尝试直接重采样到1ms时,我确实有内核死亡(内存问题)
第一个月

展开查看全部

相关问题