pandas 我该如何将.csv文件转换为.arrow文件而不将其全部加载到内存中呢?

dzhpxtsq  于 2023-03-16  发布在  其他
关注(0)|答案(3)|浏览(421)

我在这里发现了一个类似的问题:Read CSV with PyArrow
在这个答案中,它引用了sys.stdin.buffer和sys.stdout.buffer,但是我不确定如何写.arrow文件,或者命名它。我似乎在pyarrow的文档中找不到我想要的确切信息。我的文件没有任何nans,但是它有一个时间戳索引。这个文件大约有100 gb,所以把它加载到内存中根本不是一个选项。我试着修改代码,但是正如我所设想的,代码在每个循环中都覆盖了前一个文件。

***这是我的第一个帖子。我想感谢所有的贡献者,他们在我问他们之前就回答了我99.9%的其他问题。

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1     ### used one line chunks for a small test

def main():
    writer = None
    for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):

        table = pa.Table.from_pandas(split)
        # Write out to file
        with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
            with pa.RecordBatchFileWriter(sink, table.schema) as writer:
                writer.write_table(table)
    writer.close()

if __name__ == "__main__":
    main()

下面是我在命令行中使用的代码

>cat data.csv | python test.py
anhgbhbe

anhgbhbe1#

正如@Pace所建议的,你应该考虑将输出文件的创建移出阅读循环,比如:

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1     ### used one line chunks for a small test

def main():
    # Write out to file
    with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
        with pa.RecordBatchFileWriter(sink, table.schema) as writer:
            for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
                table = pa.Table.from_pandas(split)
                writer.write_table(table)

if __name__ == "__main__":
    main()

如果您希望指定特定的输入和输出文件,也不必使用sys.stdin.buffer。您可以运行脚本如下:

python test.py

通过使用with语句,writersink都将在之后自动关闭(在本例中,当main()返回时),这意味着不需要包含显式的close()调用。

nue99wik

nue99wik2#

解决方案改编自@Martin-Evans代码:
按照@Pace的建议,在for循环后关闭文件

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1000000

def main():
    schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema 
    ### reads first two lines to define schema 

    with pa.OSFile('test.arrow', 'wb') as sink:
        with pa.RecordBatchFileWriter(sink, schema) as writer:            
            for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
                table = pa.Table.from_pandas(split)
                writer.write_table(table)

            writer.close()

if __name__ == "__main__":
    main()
lx0bsm1f

lx0bsm1f3#

在2023年你不需要Pandas来做这个。你可以通过csv使用箭头chunk

import pyarrow as pa
from pyarrow import csv

schema =  pa.schema([
        ('time', pa.timestamp('ms', None)),
        ('deviceid', pa.utf8())
])
convert_dict = {
  'time': pa.timestamp('ms', None),
  'deviceid': pa.utf8()
}
convert_options = pa.csv.ConvertOptions(
    column_types=convert_dict
    , strings_can_be_null=True
    , quoted_strings_can_be_null=True
    ,timestamp_parsers=["%Y-%m-%d %H:%M:%S"],
)

arrowfile = "data_dst.arrow"
csvfile = "data_src.csv"

with pa.OSFile(arrowfile, 'wb') as sink:     ### no append mode yet
   with pa.csv.open_csv(csvfile, convert_options=convert_options) as reader:
       with pa.RecordBatchFileWriter(sink, schema) as writer:
           for next_chunk in reader:
               if next_chunk is None:
                    break
               if writer is None:
                    break
               next_table = pa.Table.from_batches([next_chunk])
               writer.write_table(next_table)

相关问题