pandas 我如何GroupShuffleSplit一个 parquet Dataframe 懒?

bbmckpt7  于 2023-09-29  发布在  其他
关注(0)|答案(5)|浏览(130)

我有一个看起来像这样的parquet数据集(我使用的是polars,但任何dataframe库都可以):

df = pl.DataFrame(
    {
        "match_id": [
            1, 1, 1,
            2, 2, 2, 2,
            3, 3, 3, 3,
        ],
        "team_id": [
            1, 2, 2,
            1, 1, 2, 2,
            1, 2, 2, 3,
        ],
        "player_name": [
            "kevin", "james", "kelly",
            "john", "jenny", "jim", "josh",
            "alice", "kevin", "lilly", "erica",
        ],
    }
)

我想通过match_id和测试训练分割分组,这样80%的匹配在训练集中,其余的在测试集中。比如说

group_df = df.group_by(["match_id"])
train, test = group_split(group_df, test_size=0.20)

我需要一个python解决方案,最好是dask,pandas或其他数据框架库。目前pandas不支持惰性求值,因为数据集非常大。因此,使用Pandas似乎是不可能的。另一方面,Dask不支持任何sklearn.model_selection拆分器,因为它不支持基于整数的索引。
理想情况下,我只需要一个简单的GroupShuffleSplit和dask。有没有其他的库支持这个?如果是这样,我该如何以懒惰的方式使用 parquet 呢?

bkhjykvo

bkhjykvo1#

也许像这样的一些会为你工作。
然而,这不是一个完美的答案,它试图解决大数据量的问题。
在该解决方案中,GroupShuffleSplit适用于数据的每个分区,但不适用于孔数据集,并且由于使用了match_id.unique,因此train/test可能根本不是20/80大小。

解决方案

import dask.dataframe as dd
import numpy as np
from sklearn.model_selection import GroupShuffleSplit

train = []
test = []
gss = GroupShuffleSplit(n_splits=1, test_size=0.20, random_state=42)  # Adjust random_state as needed

for i in range(df.npartitions):
    part = df.partitions[i]
    groups = part.match_id.unique().compute()
    train_groups, test_groups = next(gss.split(groups, groups=groups))
    train += [part[part.match_id.isin(train_groups)]]
    test += [part[part.match_id.isin(test_groups)]]

# now in test array you will have list of dask dataframes
# to fetch data from them just concat and compute

dd.concat(test).shape[0].compute() # will give in my case 282_111_648

使用此数据测试的解决方案

import polars as pl
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
pl.build_info().get('version')
# '0.19.2'
n_rows = 10**6 # 1_000_000 rows
df = pl.DataFrame([
    pl.Series('match_id', np.random.choice(range(10**3), size=n_rows)), # 1_000 matches
    pl.Series('team_id', np.random.choice(range(10**2), size=n_rows)), # 100 teams
    pl.Series('player_name', np.random.choice([
            "kevin", "james", "kelly",
            "john", "jenny", "jim", "josh",
            "alice", "kevin", "lilly", "erica",
        ], size=n_rows))
]).lazy()
df = pl.concat([df]*1_0000) # 1_000_000_000 rows
df.collect(streaming=True).write_parquet('test.parquet') # ~5GB
w41d8nur

w41d8nur2#

import dask.dataframe as dd
import dask.array as da
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
ddf = dd.read_parquet('your_dataset.parquet')
def dask_group_shuffle_split(df, groups, test_size=0.2, random_state=None):
    # Get the unique group values
    unique_groups = df[groups].drop_duplicates()

    # Create an array of unique group indices
    group_indices = da.from_array(unique_groups.to_dask_array(), chunks=1)

    # Perform the GroupShuffleSplit
    splitter = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    train_idx, test_idx = next(splitter.split(df, groups=df[groups]))

    # Map the group indices to corresponding rows
    train_groups = group_indices[train_idx].compute()
    test_groups = group_indices[test_idx].compute()

    # Filter the original DataFrame based on the selected indices
    train = df[df.index.isin(train_idx)]
    test = df[df.index.isin(test_idx)]

    return train, test, train_groups, test_groups
train, test, train_groups, test_groups = dask_group_shuffle_split(ddf, groups='match_id', test_size=0.2, random_state=42)
vsaztqbk

vsaztqbk3#

如果您的主要目标是在保留组的同时拆分数据,并且您希望使用惰性计算引擎,那么Dask确实是一个不错的选择。

import dask.dataframe as dd
import numpy as np

# For this example, I used the 'from_pandas' method (for my environment)
# In your actual use-case, you need to use dd.read_parquet() method.

pdf = pd.DataFrame(
    {
        "match_id": [
            1, 1, 1,
            2, 2, 2, 2,
            3, 3, 3, 3,
        ],
        "team_id": [
            1, 2, 2,
            1, 1, 2, 2,
            1, 2, 2, 3,
        ],
        "player_name": [
            "kevin", "james", "kelly",
            "john", "jenny", "jim", "josh",
            "alice", "kevin", "lilly", "erica",
        ],
    }
)

ddf = dd.from_pandas(pdf, npartitions=2)

# Here you need the unique match_ids
unique_matches = ddf['match_id'].unique().compute()

# Here you need to shuffle the unique matches
shuffled_matches = np.random.permutation(unique_matches)

# Here you need to split indices for train-test
split_idx = int(0.8 * len(shuffled_matches))

train_matches = shuffled_matches[:split_idx]
test_matches = shuffled_matches[split_idx:]

# Then filter out the records based on match_id
train_df = ddf[ddf['match_id'].isin(train_matches)]
test_df = ddf[ddf['match_id'].isin(test_matches)]

# The above operation is still lazy. You can compute to get the actual dataframe.
train_df_computed = train_df.compute()
test_df_computed = test_df.compute()

print(train_df_computed)
print(test_df_computed)

我的方法是手动的,不像使用GroupShuffleSplit那样优雅,但它可以达到目的,但如果您的数据已经在Parquet文件中,您可以使用dd.read_parquet()将其直接读入Dask DataFrame。对于.compute()方法,它将触发Dask中的实际计算,在此之前,一切都只是一个懒惰的操作。

tquggr8v

tquggr8v4#

如果您只想使用GroupBy方法,这里有一种方法可以实现。

def group_split(grouped_data, test_size=0.2):
    ngroups = grouped_data.ngroups
    train_size = ngroups - math.ceil(ngroups * test_size)

    group_names = list(grouped_data.groups.keys())
    train_data = pd.concat((grouped_data.get_group(group_id) for group_id in group_names[:train_size]), ignore_index=True)
    test_data = pd.concat((grouped_data.get_group(group_id) for group_id in group_names[train_size:]), ignore_index=True)
    return train_data, test_data

相同的示例输出为

group_split(grouped, 0.2)

(   match_id  team_id player_name
 0         1        1       kevin
 1         1        2       james
 2         1        2       kelly
 3         2        1        john
 4         2        1       jenny
 5         2        2         jim
 6         2        2        josh,
    match_id  team_id player_name
 0         3        1       alice
 1         3        2       kevin
 2         3        2       lilly
 3         3        3       erica)

你也可以通过使用group_names变量来添加 Shuffle 和其他功能,为了简洁起见,这里没有包括在内。

jutyujz0

jutyujz05#

您可以使用dask.dataframe对Parquet数据集执行分组和拆分操作。下面是一个示例代码片段,它应该可以完成您所描述的内容:

import dask.dataframe as dd
from dask.model_selection import GroupShuffleSplit
# Load your Parquet file into a dask dataframe
dd_df = dd.read_parquet('path/to/your/file.parquet')
# Group by match_id and split into train and test sets
groups = dd_df.groupby('match_id')
train, test = groups.random_state(42).split(test_size=0.2, random_state=42)
# The resulting train and test dataframes will be dask dataframes
print(train.head())
print(test.head())

此代码使用dask.dataframe加载Parquet文件并执行分组和拆分操作。dask.model_selection中的GroupShuffleSplit类用于将数据分为训练集和测试集。random_state参数设置为42以确保再现性。生成的训练和测试 Dataframe 将是dask Dataframe ,然后您可以将其用于进一步的处理或建模任务。请注意,由于您正在处理大型数据集,因此建议使用dask Dataframe 而不是pandas Dataframe 以避免内存限制。

相关问题