关于在Python中使用多处理和slaves的问题,无法调用“bytes”对象

yvfmudvl  于 2022-12-27  发布在  Python
关注(0)|答案(2)|浏览(186)

我刚开始尝试使用Python中的多处理来卸载一些任务。这是这里的基本代码,但我将它作为Orthanc的一部分“Python插件”的一部分使用,如下所示:Orthanc多处理
这是有点复杂,但我的问题似乎是可能相当简单:

“从属进程”

def DelegateStudyArchive(uri):

    new_zip = BytesIO()
    logging.info("In the Slave Handler")
    r = requests.get('http://localhost:8042'+uri, headers = { 'Authorization' : TOKEN })
    logging.info(r.ok)
    logging.info(r.headers)
    archive = r.text # vs. text vs. content
    with ZipFile('/python/radiant_cd.zip', 'r') as radiant_zip:
        with ZipFile(new_zip, 'w') as new_archive:
            for item in radiant_zip.filelist:
                #  To get rid of '__MACOSX' files skip them here
                if '__MACOSX' not in item.filename:
#                     logging.info("Adding " +item.filename+ " to archive")
                    new_archive.writestr(item, radiant_zip.read(item.filename))
                else:
                    logging.info("Skipping " +item.filename+ ", it is a Mac OS file remnant.")
            new_archive.writestr('dcmdata.zip', archive)
            # Important to read as binary, otherwise the codec fails.
            f = open("/python/ReadMe.pdf", "rb")
            new_archive.writestr('ReadMe.pdf', f.read())
            f.close()
    value = new_zip.getvalue()
    return value

主脚本

def OnDownloadStudyArchive(output, uri, **request):

    # Offload the call to "SlowComputation" onto one slave process.
    # The GIL is unlocked until the slave sends its answer back.
    host = "Not Defined"
    userprofilejwt = "Not Defined"
    if "headers" in request and "host" in request['headers']:
        host = request['headers']['host']
    if "headers" in request and "userprofilejwt" in request['headers']:
        userprofilejwt = request['headers']['userprofilejwt']
    logging.info("STUDY|DOWNLOAD_ARCHIVE|ID=" + request['groups'][0] + "  HOST=" + host + "  PROFILE=  " + userprofilejwt)
    uri = uri.replace("_slave", '')
    answer = POOL.apply(DelegateStudyArchive(uri), args=(uri), kwds = {})
    pool.close()
    output.AnswerBuffer(answer, 'application/zip')

orthanc.RegisterRestCallback('/studies/(.*)/archive_slave', OnDownloadStudyArchive)

我已经足够让主脚本调用DelegateStudyArchive(uri),因为日志显示:

2022-12-25 04:55:24,504 | root | INFO     | In the Slave Handler
2022-12-25 04:55:24,525 | urllib3.connectionpool | DEBUG    | Starting new HTTP connection (1): localhost:8042
2022-12-25 04:55:24,686 | urllib3.connectionpool | DEBUG    | http://localhost:8042 "GET /studies/0cc9fb82-726d3dfc-e6f2b353-e96558d7-986cbb2c/archive HTTP/1.1" 200 None
2022-12-25 04:55:25,610 | root | INFO     | JOB|JOB_SUCCESS|{"CompletionTime": "20221225T095525.609389", "Content": {"ArchiveSize": "7520381", "ArchiveSizeMB": 7, "Description": "REST API", "InstancesCount": 51, "UncompressedSize": "17817326", "UncompressedSizeMB": 16}, "CreationTime": "20221225T095524.546173", "EffectiveRuntime": 0.923, "ErrorCode": 0, "ErrorDescription": "Success", "ErrorDetails": "", "ID": "8b619458-5b82-441d-9505-94e68d90398e", "Priority": 0, "Progress": 100, "State": "Success", "Timestamp": "20221225T095525.609624", "Type": "Archive"}
2022-12-25 04:55:25,612 | root | INFO     | JOB|MEDIA|ArchiveorDCMCreatedviaJOB
2022-12-25 04:55:25,622 | root | INFO     | True
2022-12-25 04:55:25,623 | root | INFO     | {'Connection': 'close', 'Content-Disposition': 'filename="0cc9fb82-726d3dfc-e6f2b353-e96558d7-986cbb2c.zip"', 'Content-Type': 'application/zip'}
2022-12-25 04:55:26,468 | charset_normalizer | DEBUG    | Encoding detection: Unable to determine any suitable charset.

但是,我在主脚本中遇到一个错误,内容如下:

E1225 04:55:27.163292 PluginsManager.cpp:153] Error in the REST callback, traceback:
<class 'TypeError'>
'bytes' object is not callable

  File "/python/combined.py", line 2147, in OnDownloadStudyArchive
    answer = POOL.apply(DelegateStudyArchive(uri), args=(uri), kwds = {})

  File "/usr/lib/python3.9/multiprocessing/pool.py", line 357, in apply
    return self.apply_async(func, args, kwds).get()

  File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value

所以我认为“answer”是null或者只是抛出一个异常,并且不返回zip文件。我假设/希望有一个简单的修复方法,因为它在其他方面似乎是有效的,如果是这样,我有其他几个地方我想做类似的事情。

gcuhipw9

gcuhipw91#

Thank you for hints. I refactored my code. I think things are complicated a little by the fact that my main script is a "Plug-In" for Orthanc. What I did was to create a separate download.py file for my custom script like:

    • 下载. py**
from zipfile import ZipFile, ZipInfo
import io
from io import BytesIO
import requests
import orthanc

TOKEN = orthanc.GenerateRestApiAuthorizationToken()

def DelegateStudyArchive(uri):
    print("In download.py module via multiprocessing")
    new_zip = BytesIO()
    r = requests.get('http://localhost:8042'+uri, headers = { 'Authorization' : TOKEN })
    archive = r.content # vs. text vs. content
    with ZipFile('/python/radiant_cd.zip', 'r') as radiant_zip:
        with ZipFile(new_zip, 'w') as new_archive:
            for item in radiant_zip.filelist:
                if '__MACOSX' not in item.filename:
                    new_archive.writestr(item, radiant_zip.read(item.filename))
            new_archive.writestr('dcmdata.zip', archive)
            f = open("/python/ReadMe.pdf", "rb")
            new_archive.writestr('ReadMe.pdf', f.read())
            f.close()
    return new_zip.getvalue()

然后在我的主脚本中,导入自定义脚本:

    • 主文件. py**
from  download import DelegateStudyArchive
. . .
# PROTOTYPE FOR DELEGATING TO SLAVE MODULE USING MULTIPROCESSING

def OnDownloadStudyArchive(output, uri, **request):

    # Offload the call to "SlowComputation" onto one slave process.
    # The GIL is unlocked until the slave sends its answer back.
    host = "Not Defined"
    userprofilejwt = "Not Defined"
    if "headers" in request and "host" in request['headers']:
        host = request['headers']['host']
    if "headers" in request and "userprofilejwt" in request['headers']:
        userprofilejwt = request['headers']['userprofilejwt']
    logging.info("Delegating Study Archive Download to download.py module . . . .")
    logging.info("STUDY|DOWNLOAD_ARCHIVE|ID=" + request['groups'][0] + "  HOST=" + host + "  PROFILE=  " + userprofilejwt)
    uri = uri.replace("_slave", '')
    answer = POOL.apply(DelegateStudyArchive, args=(uri, ))
    output.AnswerBuffer(answer, 'application/zip')

orthanc.RegisterRestCallback('/studies/(.*)/archive_slave', OnDownloadStudyArchive)

现在,answer = POOL. apply(DelegateStudyArchive,args =(uri,))可以正常工作,它会根据需要执行脚本。
这实际上是一个更好的安排,因为我有一些其他的方法,目前在主脚本,我想以同样的方式处理,他们也排序'模块化'这种方式。
如果您阅读了本文顶部的链接中的详细信息,您将看到一个更详细的解释,说明为什么在使用Orthanc Python插件时这可能是必要的。

6jygbczu

6jygbczu2#

您已编码:

answer = POOL.apply(DelegateStudyArchive(uri), args=(uri), kwds = {})

这是做什么的?如果你看apply的第一个参数,它是调用DelegateStudyArchive(uri)的结果,DelegateStudyArchive(uri)是一个bytes字符串。这个参数应该只是一个"worker"* 函数 * 的引用。你的第二个参数,作为 * args * 关键字传递的值,应该是一个 * iterable *,枚举所有要传递给DelegateStudyArchive的位置参数。您传递的是uri,字符串。注意,uri周围的圆括号没有任何作用,它等效于编码args=uri。由于字符串是 * iterable *,你需要向辅助函数传递N个参数,其中N是uri的长度,参数只是字符串中的单个字符。

from multiprocessing import Pool

def worker(arg0, arg1, arg2):
    print(arg0)
    print(arg1)
    print(arg2)

if __name__ == '__main__':
    with Pool(1) as pool:
        pool.apply(worker, args=('abc'))

图纸:

a
b
c

因此,需要作为 * args * 关键字值传递的是要传递的实际参数的listtuple

from multiprocessing import Pool

def worker(s):
    print(s)

if __name__ == '__main__':
    with Pool(1) as pool:
        pool.apply(worker, args=('abc',))

图纸:

abc

请注意,args=(s,)中的逗号使(s,)成为tuple
因此,在您的情况下:

answer = POOL.apply(DelegateStudyArchive, args=(uri,), kwds = {})

相关问题