请参见从pybind 11帮助(https://github.com/pybind/python_example)中提取的以下示例:
网址setup.py:
import sys
# Available at setup time due to pyproject.toml
from pybind11 import get_cmake_dir
from pybind11.setup_helpers import Pybind11Extension, build_ext
from setuptools import setup
__version__ = "0.0.1"
# The main interface is through Pybind11Extension.
# * You can add cxx_std=11/14/17, and then build_ext can be removed.
# * You can set include_pybind11=false to add the include directory yourself,
# say from a submodule.
#
# Note:
# Sort input source files if you glob sources to ensure bit-for-bit
# reproducible builds (https://github.com/pybind/python_example/pull/53)
ext_modules = [
Pybind11Extension("python_example",
["src/main.cpp"],
# Example: passing in the version to the compiled code
define_macros = [('VERSION_INFO', __version__)],
),
]
setup(
name="python_example",
version=__version__,
author="Sylvain Corlay",
author_email="sylvain.corlay@gmail.com",
url="https://github.com/pybind/python_example",
description="A test project using pybind11",
long_description="",
ext_modules=ext_modules,
extras_require={"test": "pytest"},
# Currently, build_ext only provides an optional "highest supported C++
# level" feature, but in the future it may provide more features.
cmdclass={"build_ext": build_ext},
zip_safe=False,
python_requires=">=3.7",
)
Cpp部分是(src/main.cpp):
#include <pybind11/pybind11.h>
#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)
int add(int i, int j) {
return i + j;
}
namespace py = pybind11;
PYBIND11_MODULE(python_example, m) {
m.doc() = R"pbdoc(
Pybind11 example plugin
-----------------------
.. currentmodule:: python_example
.. autosummary::
:toctree: _generate
add
subtract
)pbdoc";
m.def("add", &add, R"pbdoc(
Add two numbers
Some other explanation about the add function.
)pbdoc");
m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
Subtract two numbers
Some other explanation about the subtract function.
)pbdoc");
#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
m.attr("__version__") = "dev";
#endif
}
我要运行的python代码是这样的(example.py):
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import add
def python_add(i: int, j: int) -> int:
return i + j
def add_column_values_python(row: pd.Series) -> pd.Series:
row['sum'] = python_add(row['i'], row['j'])
def add_column_values(row: pd.Series) -> pd.Series:
row['sum'] = add(int(row['i']), int(row['j']))
def main():
dataframe = pd.read_csv('./example.csv', index_col=[])
dataframe['sum'] = np.nan
with ProgressBar():
d_dataframe = dd.from_pandas(dataframe, npartitions=16)
dataframe = d_dataframe.map_partitions(
lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')
with ProgressBar():
d_dataframe = dd.from_pandas(dataframe, npartitions=16)
dataframe = d_dataframe.map_partitions(
lambda df: df.apply(add_column_values, axis=1), meta=pd.Series(dtype='float64')).compute(scheduler='processes')
if __name__ == '__main__':
main()
示例.csv文件如下所示:
i,j
1,2
3,4
5,6
7,8
9,10
但是当我使用C++ add 版本运行这段代码时,我得到了以下错误:
[########################################] | 100% Completed | 1.24 ss
[ ] | 0% Completed | 104.05 ms
Traceback (most recent call last):
File "/Users/user/local/src/python_example/example.py", line 38, in <module>
main()
File "/Users/user/local/src/python_example/example.py", line 33, in main
dataframe = d_dataframe.map_partitions(
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py", line 314, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py", line 599, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/multiprocessing.py", line 233, in get
result = get_async(
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py", line 499, in get_async
fire_tasks(chunksize)
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py", line 481, in fire_tasks
dumps((dsk[key], data)),
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'PyCapsule' object
有没有办法解决这个问题,也许可以在C++模块定义中定义一些东西?
注意,这个例子只是为了说明问题。
2条答案
按热度按时间mf98qq941#
概述
在python中,如果你想把一个对象从一个进程传递到另一个进程(不管有没有dask),你需要一种方法来序列化它。默认的方法是pickle。C库中的对象基本上是动态的基于指针的东西,pickle不知道如何处理它们。你可以通过提供getstate/setstate或reduce dunder方法来为你的C对象实现pickle协议。
或者,dask有一个序列化层,在那里你可以为特定的类注册特定的ser/de函数,但这只是用分布式调度器,而不是多处理(前者在各个方面都更好,你没有充分的理由应该使用多处理)。
具体
有几个更简单的选择:
add
函数有问题,将导入移到add_column_values函数中可能就足够了,这样每个工作线程都可以获得自己的副本,而不是从闭包传递。6yjfywim2#
感谢mdurant我终于得到了它的工作,这里是更新的代码:
现在main.cpp看起来像这样:
example.py文件看起来像这样:
其思想是将函数放在一个类中,然后定义
__getstate__
和__setstate__
python magic函数或函数中的import。如需了解更多信息:https://pybind11-jagerman.readthedocs.io/en/stable/advanced.html