pandas 使用pybind11函数和dask时,无法pickle 'PyCapsule'对象错误

hs1ihplo  于 2023-02-17  发布在  其他
关注(0)|答案(2)|浏览(177)

请参见从pybind 11帮助(https://github.com/pybind/python_example)中提取的以下示例:
网址setup.py:

import sys

# Available at setup time due to pyproject.toml
from pybind11 import get_cmake_dir
from pybind11.setup_helpers import Pybind11Extension, build_ext
from setuptools import setup

__version__ = "0.0.1"

# The main interface is through Pybind11Extension.
# * You can add cxx_std=11/14/17, and then build_ext can be removed.
# * You can set include_pybind11=false to add the include directory yourself,
#   say from a submodule.
#
# Note:
#   Sort input source files if you glob sources to ensure bit-for-bit
#   reproducible builds (https://github.com/pybind/python_example/pull/53)

ext_modules = [
    Pybind11Extension("python_example",
        ["src/main.cpp"],
        # Example: passing in the version to the compiled code
        define_macros = [('VERSION_INFO', __version__)],
        ),
]

setup(
    name="python_example",
    version=__version__,
    author="Sylvain Corlay",
    author_email="sylvain.corlay@gmail.com",
    url="https://github.com/pybind/python_example",
    description="A test project using pybind11",
    long_description="",
    ext_modules=ext_modules,
    extras_require={"test": "pytest"},
    # Currently, build_ext only provides an optional "highest supported C++
    # level" feature, but in the future it may provide more features.
    cmdclass={"build_ext": build_ext},
    zip_safe=False,
    python_requires=">=3.7",
)

Cpp部分是(src/main.cpp):

#include <pybind11/pybind11.h>

#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)

int add(int i, int j) {
    return i + j;
}

namespace py = pybind11;

PYBIND11_MODULE(python_example, m) {
    m.doc() = R"pbdoc(
        Pybind11 example plugin
        -----------------------

        .. currentmodule:: python_example

        .. autosummary::
        :toctree: _generate

        add
        subtract
    )pbdoc";

    m.def("add", &add, R"pbdoc(
        Add two numbers

        Some other explanation about the add function.
    )pbdoc");

    m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
        Subtract two numbers

        Some other explanation about the subtract function.
    )pbdoc");

#ifdef VERSION_INFO
    m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
    m.attr("__version__") = "dev";
#endif
}

我要运行的python代码是这样的(example.py):

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import add

def python_add(i: int, j: int) -> int:
    return i + j

def add_column_values_python(row: pd.Series) -> pd.Series:
    row['sum'] = python_add(row['i'], row['j'])

def add_column_values(row: pd.Series) -> pd.Series:
    row['sum'] = add(int(row['i']), int(row['j']))

def main():
    dataframe = pd.read_csv('./example.csv', index_col=[])
    dataframe['sum'] = np.nan

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values, axis=1), meta=pd.Series(dtype='float64')).compute(scheduler='processes')

if __name__ == '__main__':
    main()

示例.csv文件如下所示:

i,j
1,2
3,4
5,6
7,8
9,10

但是当我使用C++ add 版本运行这段代码时,我得到了以下错误:

[########################################] | 100% Completed | 1.24 ss
[                                        ] | 0% Completed | 104.05 ms
Traceback (most recent call last):
  File "/Users/user/local/src/python_example/example.py", line 38, in <module>
    main()
  File "/Users/user/local/src/python_example/example.py", line 33, in main
    dataframe = d_dataframe.map_partitions(
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py", line 314, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/multiprocessing.py", line 233, in get
    result = get_async(
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py", line 499, in get_async
    fire_tasks(chunksize)
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py", line 481, in fire_tasks
    dumps((dsk[key], data)),
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'PyCapsule' object

有没有办法解决这个问题,也许可以在C++模块定义中定义一些东西?
注意,这个例子只是为了说明问题。

mf98qq94

mf98qq941#

概述

在python中,如果你想把一个对象从一个进程传递到另一个进程(不管有没有dask),你需要一种方法来序列化它。默认的方法是pickle。C库中的对象基本上是动态的基于指针的东西,pickle不知道如何处理它们。你可以通过提供getstate/setstate或reduce dunder方法来为你的C对象实现pickle协议。
或者,dask有一个序列化层,在那里你可以为特定的类注册特定的ser/de函数,但这只是用分布式调度器,而不是多处理(前者在各个方面都更好,你没有充分的理由应该使用多处理)。

具体

有几个更简单的选择:

  • 使用线程调度器,这样就不需要序列化(C代码应该释放GIL并获得完全并行)
  • 我认为只有add函数有问题,将导入移到add_column_values函数中可能就足够了,这样每个工作线程都可以获得自己的副本,而不是从闭包传递。
6yjfywim

6yjfywim2#

感谢mdurant我终于得到了它的工作,这里是更新的代码:
现在main.cpp看起来像这样:

#include <pybind11/pybind11.h>

#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)

int add(int i, int j) {
    return i + j;
}

class Add {
public:
    Add() {};
    int add(int i, int j) {
        return i + j;
    }
};

namespace py = pybind11;

PYBIND11_MODULE(python_example, m) {
    m.doc() = R"pbdoc(
        Pybind11 example plugin
        -----------------------

        .. currentmodule:: python_example

        .. autosummary::
        :toctree: _generate

        add
        subtract
    )pbdoc";

    m.def("add", &add, R"pbdoc(
        Add two numbers

        Some other explanation about the add function.
    )pbdoc");

    m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
        Subtract two numbers

        Some other explanation about the subtract function.
    )pbdoc");

#ifdef VERSION_INFO
    m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
    m.attr("__version__") = "dev";
#endif

    py::class_<Add>(m, "Add")
        .def(py::init<>())
        .def("__call__", &Add::add)
        .def("__getstate__", [](const Add &p) {
            /* Return a tuple that fully encodes the state of the object */
            return py::make_tuple();
        })
        .def("__setstate__", [](Add &p, py::tuple t) {
            if (t.size() != 0)
                throw std::runtime_error("Invalid state!");

            /* Invoke the in-place constructor. Note that this is needed even
            when the object just has a trivial default constructor */
            new (&p) Add();

            /* Assign any additional state */
        });
}

example.py文件看起来像这样:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import Add

def python_add(i: int, j: int) -> int:
    return i + j

def add_column_values_python(row: pd.Series) -> pd.Series:
    row['sum'] = python_add(row['i'], row['j'])
    return row

def add_column_values(row: pd.Series) -> pd.Series:
    row['sum'] = Add()(int(row['i']), int(row['j']))
    return row

def add_column_values_import(row: pd.Series) -> pd.Series:
    from python_example import add
    row['sum'] = add(int(row['i']), int(row['j']))
    return row

def main():
    dataframe = pd.read_csv('./example.csv', index_col=[])
    dataframe['sum'] = np.nan

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values, axis=1)).compute(scheduler='processes')

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_import, axis=1)).compute(scheduler='processes')

if __name__ == '__main__':
    main()

其思想是将函数放在一个类中,然后定义__getstate____setstate__ python magic函数或函数中的import。
如需了解更多信息:https://pybind11-jagerman.readthedocs.io/en/stable/advanced.html

相关问题