.net 以流式方式将大型数组写入parquet?

u3r8eeie  于 2023-06-25  发布在  .NET
关注(0)|答案(1)|浏览(125)

bounty将在19小时后到期。此问题的答案有资格获得+50声望奖励。Niels Harremoes希望引起更多关注这个问题。

我们使用ParquetSharp。我们有一个看起来像这样的模式(ParquetSharp c#格式):

new Column[] {
  new Column<string>("measurement_name"),
  new Column<DateTime?>("timestamp"),
  new Column<float?[]>("data")
}

我们有时需要向数据列写入一个大型数据集-大约2.5亿个值。
我们能以流的方式写条目吗?我们以IEnumerable<float?>的形式获取数据,但目前我们必须在写入之前将其转换为float?[],这会导致OutOfMemory错误。
我们目前是这样写的:

void WriteData(ParquetFileWriter writer1, IList<MyDataset> dataSets)
{

  using var rg1 = writer1.AppendRowGroup();
  using (var colWriter = rg1.NextColumn().LogicalWriter<string>())
  {
    colWriter.WriteBatch(dataSets.Select(c => c.MeasurementName).ToArray());
  }
  using (var colWriter = rg1.NextColumn().LogicalWriter<DateTime?>())
  {
    colWriter.WriteBatch(dataSets.Select(c => c.DateTime).ToArray());
  }
  using (var colWriter = rg1.NextColumn().LogicalWriter<float?[]>())
  {
    colWriter.WriteBatch(dataSets.Select(c => c.Data).ToArray());
  }
}

其中MyDataset.Data是float?[]属性。但我们想将其转换为IEnumerable<float?>-或类似?
当然,我们可以通过转换为数据集流并为每个数据集编写一个新的行组来减少内存使用,但有时我们只会得到一个非常大的数据集。

yyyllmsg

yyyllmsg1#

我不太确定这是否是你正在寻找的,但也许使用静态扩展可能会有所帮助?

public static class DataSetExtensions
{

    private static async IAsyncEnumerable<List<TModel>> FetchPartially<TModel>(this IList<TModel> data, int batchSize = 10)
    {
        var count = data.Count();
        var batches = (int)Math.Ceiling((double)count / batchSize);
        for (var i = 0; i < batches; i++)
        {
            var skip = i * batchSize;
            var take = Math.Min(batchSize, count - skip);
            yield return data.Skip(skip).Take(take).ToList();
        }
    }
}

用法:

var yourlargeDataset = new List<MyDataSet>();
await foreach(var smallDataset in yourlargeDataset.FetchPartially(100)) //100 items per time
{
   WriteData(riter1, smallDataset );
}

相关问题