linux中的python多处理配置单元

wztqucjr  于 2021-06-27  发布在  Hive
关注(0)|答案(2)|浏览(349)

下面的代码在windows中工作,但在linux中挂起:

from impala.dbapi import connect
from multiprocessing import Pool
conn = connect(host='172.16.12.12', port=10000, user='hive', password='hive', database='test',auth_mechanism='PLAIN')

cur = conn.cursor()
def test_hive(a):
    cur.execute('select {}'.format(a))
    tab_cc = cur.fetchall()
    tab_cc =tab_cc[0][0]
    print(a,tab_cc)

if __name__ == '__main__':
    pool = Pool(processes=8)
    alist=[1,2,3]
    for i in range(len(alist)):
        pool.apply_async(test_hive,str(i))
    pool.close()
    pool.join()

当我改变的时候 alist=[1,2,3]alist=[1] 它在linux下工作。

qvsjd97n

qvsjd97n1#

from impala.dbapi import connect
import time,datetime,sys,re
import psycopg2 as pg
today = datetime.date.today()
from multiprocessing import Pool

def test_hive(a):
    conn = connect(host='172.16.12.12', port=10000, user='hive', password='hive', database='test',auth_mechanism='PLAIN')
    cur = conn.cursor()
    #print(a)
    cur.execute('select {}'.format(a))
    tab_cc = cur.fetchall()
    tab_cc =tab_cc[0][0]
    return tab_cc

if __name__ == '__main__':
    pool = Pool(processes=8)
    alist = [1,2,4,4,4,4,5,3]
    results = []
    for i in range(len(alist)):
        results.append(pool.apply_async(test_hive, str(i)))
    pool.close()
    pool.join()
    for result in results:
        try:
            print(result.get())
        except Exception as e:
            print("{}: {}".format(type(e).__name__, e))

我移动这两条线来测试Hive是否工作。

conn = connect(host='172.16.12.12', port=10000, user='hive', password='hive', database='test',auth_mechanism='PLAIN')
cur = conn.cursor()
l7wslrjt

l7wslrjt2#

我认为这种行为有两个可能的原因:
中出现的异常 test_hive 在分叉子流程的上下文中
由以下事实引起的僵局 fork 不从父线程复制线程和/或以执行fork调用时的状态复制互斥体
要检查异常,请添加 return tab_cc 到你生命的尽头 test_hive 函数并收集池返回的结果:

if __name__ == '__main__':
    pool = Pool(processes=8)
    alist = [1,2,3]
    results = []
    for i in range(len(alist)):
        results.append(pool.apply_async(test_hive, str(i)))
    pool.close()
    pool.join()
    for result in results:
        try:
            print(result.get())
        except Exception as e:
            print("{}: {}".format(type(e).__name__, e))

至于线程,我在 impala 回购,似乎他们在使用 thrift . 我不确定python的线程模块是否真的可以看到它们,当它们来自于这个库时。你可以试试 print(multiprocessing.current_process(), threading.enumerate()) 模块级(例如 cur = conn.cursor() )在这场战争开始的时候 test_hive 函数并查看 _MainProcess(MainProcess, started) 显示的活动线程列表比所有 ForkProcess(ForkPoolWorker-<worker#>, started daemon) .
至于一个可能的解决办法:我有点怀疑你创造了 conn 以及 cur 在模块级成为罪魁祸首;所有的孩子都用这两个的副本。
试着把这两行移到 test_hive ,以便每个进程创建一个连接和一个游标(如果它自己的):

conn = connect(host='172.16.12.12', port=10000, user='hive', password='hive', database='test',auth_mechanism='PLAIN')
cur = conn.cursor()

相关问题