我在kerberized hadoop集群上用yarn设置了dask网关,它可以完美地处理按预期放大和缩小的示例Dataframe。
但是,当我尝试通过webhdfs读取驻留在hdfs上的数据时,失败了。
以下是我采取的步骤-1。在没有启动dask网关集群的情况下,我在python环境中执行了以下代码
import dask.dataframe as dd
# read data from HDFS using webhdfs and dask
df = dd.read_parquet('webhdfs://namenode.domain.com/pathtofile/part-*.parquet',
storage_options={'kerberos':True})
df.head()
df.x0.mean().compute()
它就像一个魔咒。这个文件大约是18GB,而我的客户端内存大约是4GB。
我启动我的dask网关并建立客户端,所有这些工作正常
from dask_gateway import Gateway
gateway = Gateway("http://dask-gatewaynode.domain.com:8000", auth="kerberos")
cluster = gateway.new_cluster(worker_cores=1, worker_memory=4)
cluster.adapt(minimum=1, maximum=100)
client = cluster.get_client()
现在,我执行步骤1中的代码,得到一个401错误,这表明客户端的kerberos凭据没有传递给worker。
来自网关工作进程的日志
distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to serialize 401 Client Error: Authentication required for url: http://namenode.domain.com:50070/webhdfs/v1/pathtofile/part-00000-a3d442a7-d97a-4c45-8d81-7528f7f2bc00-c000.snappy.parquet?op=GETFILESTATUS. Exception: can't pickle PyCapsule objects
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function safe_head at 0x7f2a91c15710>, (<function read_parquet_part at 0x7f2a90650b00>, <function ArrowEngine.read_partition at 0x7f2a90071680>, <fsspec.implementations.webhdfs.WebHDFS object at 0x7f2a863c63d0>, Empty DataFrame
Columns: [monotonically_increasing_id, x0, x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13, x14, x15, x16, x17, x18, x19, x20, x21, x22, x23, x24, x25, x26, x27, x28, x29, x30, x31, x32, x33, x34, x35, x36, x37, x38, x39, x40, x41, x42, x43, x44, x45, x46, x47, x48, x49, x50, x51, x52, x53, x54, x55, x56, x57, x58, x59, x60, x61, x62, x63, x64, x65, x66, x67, x68, x69, x70, x71, x72, x73, x74, x75, x76, x77, x78, x79, x80, x81, x82, x83, x84, x85, x86, x87, x88, x89, x90, x91, x92, x93, x94, x95, x96, x97, x98, ...]
Index: []
[0 rows x 1001 columns], [('pathtofile/part-00000--<hashkey>.snappy.parquet', None, [])], ['monotonically_increasing_id', 'x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6',
kwargs: {}
Exception: Exception('401 Client Error: Authentication required for url: http://namenode.domain.com:50070/webhdfs/v1/pathtofile/part-00000-<hashkey>.snappy.parquet?op=GETFILESTATUS')
dask网关配置上的python环境设置包括以下包(只是粘贴相关的包)
dask 2.16.0
dask-core 2.16.0
dask-gateway 0.7.1
dask-gateway-kerberos 0.7.1
distributed 2.17.0
fsspec 0.7.4
有没有人经历过类似的事情?或者你有什么想法可以让这个工作或者其他的方法吗?
暂无答案!
目前还没有任何答案,快来回答吧!