DeepSpeed-MII 在多进程中,当调用客户端推理时阻塞,

vnjpjtjt  于 10个月前  发布在  其他
关注(0)|答案(3)|浏览(134)

我尝试将MII集成到Triton服务器,但遇到了一些问题。以下是我的部分代码:

错误是:当我使用

MII块在

当我使用

能够正常推断,但是gRPC持续报告错误(不影响推断,但服务不稳定)

56lgkhnf

56lgkhnf1#

我遇到了类似的情况。以下是我的代码:

  1. def worker(rank, this_model):
  2. try:
  3. if this_model is None:
  4. client = mii.client('qwen')
  5. else:
  6. client = this_model
  7. response = client.generate(["xxx"], max_new_tokens=1024, stop="<|im_end|>", do_sample=False, return_full_text=True)
  8. print("in worker rank:", rank, " response:", response)
  9. except Exception as e:
  10. print(f"Capture error:{e}")
  11. finally:
  12. print("final")
  13. model = mii.serve(model_dir, deployment_name="qwen", tensor_parallel=xx, replica_num=replica_num)
  14. job_process = []
  15. for rank in range(0, replica_num):
  16. if rank == 0:
  17. job_process.append(threading.Thread(target=worker,args=(rank,model,)))
  18. else:
  19. job_process.append(threading.Thread(target=worker,args=(rank,None,)))
  20. for process in job_process:
  21. process.start()
  22. for process in job_process:
  23. process.join()

当使用 threading.Thread 时,它运行良好。然而,如果使用 multiprocessing.Process ,它将在 client.generate 中被阻塞。

展开查看全部
kqqjbcuj

kqqjbcuj2#

我遇到了类似的情况。以下是我的代码:

  1. def worker(rank, this_model):
  2. try:
  3. if this_model is None:
  4. client = mii.client('qwen')
  5. else:
  6. client = this_model
  7. response = client.generate(["xxx"], max_new_tokens=1024, stop="<|im_end|>", do_sample=False, return_full_text=True)
  8. print("in worker rank:", rank, " response:", response)
  9. except Exception as e:
  10. print(f"Capture error:{e}")
  11. finally:
  12. print("final")
  13. model = mii.serve(model_dir, deployment_name="qwen", tensor_parallel=xx, replica_num=replica_num)
  14. job_process = []
  15. for rank in range(0, replica_num):
  16. if rank == 0:
  17. job_process.append(threading.Thread(target=worker,args=(rank,model,)))
  18. else:
  19. job_process.append(threading.Thread(target=worker,args=(rank,None,)))
  20. for process in job_process:
  21. process.start()
  22. for process in job_process:
  23. process.join()

当使用 threading.Thread 时,它运行良好。然而,如果使用 multiprocessing.Process ,它将在 client.generate 中被阻塞。
由于 Python 中的 threading.Thread 是伪的,由于 GIL ,这段代码无法充分利用并发。这意味着我仍然需要 multiprocessing.Process 来启动新的客户端。然而,上述提到的它并不工作得很好。

展开查看全部
rfbsl7qr

rfbsl7qr3#

我遇到了类似的情况。以下是我的代码:

  1. def worker(rank, this_model):
  2. try:
  3. if this_model is None:
  4. client = mii.client('qwen')
  5. else:
  6. client = this_model
  7. response = client.generate(["xxx"], max_new_tokens=1024, stop="<|im_end|>", do_sample=False, return_full_text=True)
  8. print("in worker rank:", rank, " response:", response)
  9. except Exception as e:
  10. print(f"Capture error:{e}")
  11. finally:
  12. print("final")
  13. model = mii.serve(model_dir, deployment_name="qwen", tensor_parallel=xx, replica_num=replica_num)
  14. job_process = []
  15. for rank in range(0, replica_num):
  16. if rank == 0:
  17. job_process.append(threading.Thread(target=worker,args=(rank,model,)))
  18. else:
  19. job_process.append(threading.Thread(target=worker,args=(rank,None,)))
  20. for process in job_process:
  21. process.start()
  22. for process in job_process:
  23. process.join()

当使用 threading.Thread 时,它运行良好。然而,如果使用 multiprocessing.Process ,它将在 client.generate 中被阻塞。
由于在 Python 中 threading.Thread 是伪的,由于 GIL ,这段代码无法充分利用并发。这意味着我仍然需要 multiprocessing.Process 来启动新的客户端。然而,上述提到的它并不工作得很好。
我找到了 official example 。也许我们应该像 these ways 那样启动服务器和客户端。

展开查看全部

相关问题