根据行值拆分大型CSV文件

yizd12fk  于 2024-01-03  发布在  其他
关注(0)|答案(3)|浏览(200)

问题
我有一个名为data.csv的csv文件。在每一行上我有:

timestamp: int
account_id: int
data: float

字符串
例如:

timestamp,account_id,value
10,0,0.262
10,0,0.111
13,1,0.787
14,0,0.990

  • 此文件按时间戳排序。
  • 行数太大,无法在内存中存储所有行。
  • 数量级:100 M行,帐户数:5 M
    如何快速获取给定account_id的所有行?让account_id访问数据的最佳方法是什么?

Things I tried

生成样本:

N_ROW = 10**6
N_ACCOUNT = 10**5

# Generate data to split
with open('./data.csv', 'w') as csv_file:
  csv_file.write('timestamp,account_id,value\n')
  for timestamp in tqdm.tqdm(range(N_ROW), desc='writing csv file to split'):
    account_id = random.randint(1,N_ACCOUNT)
    data = random.random()
    csv_file.write(f'{timestamp},{account_id},{data}\n')

# Clean result folder
if os.path.isdir('./result'):
  shutil.rmtree('./result')
os.mkdir('./result')

方案一

编写一个脚本,为每个帐户创建一个文件,在原始csv上逐个读取行,在与帐户对应的文件上写入行(为每行打开和关闭一个文件)。

验证码:

# Split the data
p_bar = tqdm.tqdm(total=N_ROW, desc='splitting csv file')
with open('./data.csv') as data_file:
  next(data_file) # skip header
  for row in data_file:
    account_id = row.split(',')[1]
    account_file_path = f'result/{account_id}.csv'
    file_opening_mode = 'a' if os.path.isfile(account_file_path) else 'w'
    with open(account_file_path, file_opening_mode) as account_file:
      account_file.write(row)
    p_bar.update(1)

问题:

它是相当慢的(我认为这是低效的打开和关闭一个文件上的每一行).它需要大约4分钟的1 M行.即使它的工作,它会快吗?给定一个account_id我知道我应该读的文件的名称,但系统必须查找超过5 M文件找到它.我应该创建某种二叉树与文件夹的叶子是文件?

解决方案2(适用于小示例,不适用于大csv文件)

与解决方案1相同的想法,但不是为每行打开/关闭文件,而是将文件存储在字典中

验证码:

# A dict that will contain all files
account_file_dict = {}

# A function given an account id, returns the file to write in (create new file if do not exist)
def get_account_file(account_id):
  file = account_file_dict.get(account_id, None)
  if file is None:
    file = open(f'./result/{account_id}.csv', 'w')
    account_file_dict[account_id] = file
    file.__enter__()
  return file

# Split the data
p_bar = tqdm.tqdm(total=N_ROW, desc='splitting csv file')
with open('./data.csv') as data_file:
  next(data_file) # skip header
  for row in data_file:
    account_id = row.split(',')[1]
    account_file = get_account_file(account_id)
    account_file.write(row)
    p_bar.update(1)

问题:

我不确定它实际上更快。我必须同时打开5 M文件(每个帐户一个)。我得到一个错误OSError: [Errno 24] Too many open files: './result/33725.csv'

解决方案3(适用于小示例,不适用于大csv文件)

使用awk命令,解决方案来自:split large csv text file based on column value

验证码:

生成文件后,运行:awk -F, 'NR==1 {h=$0; next} {f="./result/"$2".csv"} !($2 in p) {p[$2]; print h > f} {print >> f}' ./data.csv

问题:

我得到以下错误:input record number 28229, file ./data.csv source line number 1(数字28229是一个例子,它通常在28 k左右失败)。我想这也是因为我打开了太多文件

vc9ivgsu

vc9ivgsu1#

@:
虽然不完全是15 GB,但我确实有一个7.6 GB,有3列:
--148 mn素数,它们的base-2 log和它们的hex

in0: 7.59GiB 0:00:09 [ 841MiB/s] [ 841MiB/s] [========>] 100%            
  
  148,156,631 lines 7773.641 MB (  8151253694)  /dev/stdin

字符串
|

f="$( grealpath -ePq ~/master_primelist_19d.txt )"

( time ( for __ in '12' '34' '56' '78' '9'; do 

     ( gawk -v ___="${__}" -Mbe 'BEGIN {

               ___="^["(___%((_+=_^=FS=OFS="=")+_*_*_)^_)"]" 

       }  ($_)~___ && ($NF = int(($_)^_))^!_' "${f}" & ) done | 

  gcat - ) ) | pvE9 > "${DT}/test_primes_squared_00000002.txt"


|

out9: 13.2GiB 0:02:06 [98.4MiB/s] [ 106MiB/s] [ <=> ]

  ( for __ in '12' '34' '56' '78' '9'; do; ( gawk -v ___="${__}" -Mbe  "${f}" &)  

  0.36s user 3     out9: 13.2GiB 0:02:06 [ 106MiB/s] [ 106MiB/s]

  • 仅使用gawk的5个示例和big-integergnu-GMP,每个示例都具有指定的素数前导位子集,

--它设法在 *2分6秒 * 内计算出这些素数的全精度平方,产生一个未排序的13.2 GB输出文件。
如果它能很快地平方,那么仅仅按account_id分组应该是在公园里散步

dldeef67

dldeef672#

1.看看https://docs.python.org/3/library/sqlite3.html你可以导入数据,创建所需的索引,然后正常运行查询。除了python本身之外,没有依赖关系。

  1. https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_csv.html
    1.如果你每次都必须查询原始数据,并且你只受到简单python的限制,那么你可以写一段代码来手动读取它并产生匹配的行,或者使用这样的助手:
from convtools.contrib.tables import Table
from convtools import conversion as c

iterable_of_matched_rows = (
    Table.from_csv("tmp/in.csv", header=True)
    .filter(c.col("account_id") == "1")
    .into_iter_rows(dict)
)

字符串
但是,这不会比使用csv.reader阅读100 M行的csv文件快。

noj0wjuj

noj0wjuj3#

使用DuckDB,完整的100 M行CSV文件(1.8GB)可以轻松地放入普通桌面计算机的内存中,并在几秒钟内运行任何SQL查询
以下是几个步骤:

CREATE TABLE result AS SELECT * from read_csv_auto('data.csv');

字符串
对于演示,我使用以下命令生成数据

CREATE TABLE result (timestamp UINTEGER, account_id UINTEGER, data REAL);                                                                                                                                
Run Time (s): real 0.000 user 0.000180 sys 0.000000                                                                                                                                                        
INSERT INTO result(timestamp,account_id,data)  (SELECT floor(random()*100000::UINTEGER), floor(random()*60000::USMALLINT), round(random()*1000,2)::REAL AS s FROM range(0, 100000000, 1));               
Run Time (s): real 8.242 user 7.732149 sys 0.509185


在内存中生成一个具有1亿行的表需要8秒
SUMMARIZE命令可以显示此表中某些统计信息

SUMMARIZE SELECT * from result;


| 列名|柱式|min| Max|近似唯一|avg| STD| Q25| Q50| Q75|计数|空百分比|
| --|--|--|--|--|--|--|--|--|--|--|--|
| 时间戳|UINTEGER| 0 | 99999 | 99492 |49999.41741353| 28866.929417142343| 25033 | 50009 | 74979 | 100000000 |0.0%|
| 帐户ID| UINTEGER| 0 | 59999 | 59997 |30001.15393498| 17322.073797863097| 15009 | 29984 | 44990 | 100000000 |0.0%|
| 数据|浮子|0.0|一千点| 99244 |499.94853794041455| 288.66460489423315| 250.03180520538703| 500.3373948166276| 749.9874392136103| 100000000 |0.0%|

Run Time (s): real 6.711 user 75.795873 sys 0.086779


您可以导出完整的CSV文件或仅导出特定帐户的一部分

COPY (SELECT timestamp,data from result WHERE account_id=1234) TO 'result_1234.csv' (FORMAT CSV);


这是我电脑上的内存使用情况(不到2GB!)

PRAGMA database_size;

| 数据库名|数据库大小|块大小|总区块数|旧块|自由块|壁尺寸|存储器使用|记忆极限|
| --|--|--|--|--|--|--|--|--|
| 存储器|0字节| 0 | 0 | 0 | 0 |0字节| 1.9GB | 26.8GB |

相关问题