在Python多处理中使用自定义数据类型实现微秒级数据检索

dxpyg8gm  于 2023-11-20  发布在  Python
关注(0)|答案(1)|浏览(95)

问题描述:

我目前正在做一个Python项目,涉及多处理,需要高效的数据通信。我处理的数据存储在一个列表中,列表中的每个元素都是自定义的数据类型。挑战是实现超快的数据检索,目标是微秒级的性能。

具体要求:

1.**数据内容:**要传递的数据是一个列表,包含自定义数据类型,如pyqt中的QPointF。
1.**性能目标:**数据检索过程需要尽可能快,目标是微秒级的效率。
1.**可用性:**一旦检索到数据,它应该是直接可用的,无需额外的处理或组织。

以前的尝试:

我已经探索了**multiprocessing**模块中的各种解决方案,包括:

*multiprocessing.Queue
*multiprocessing.Pipe
*multiprocessing.managers.ShareableList
*multiprocessing.shared_memory.SharedMemory

不幸的是,这些方法都没有达到预期的速度和可用性标准。**QueuePipe的检索时间都在毫秒范围内,而ShareableListSharedMemory**不支持包含自定义数据类型的列表,需要额外的检索后组织。

请求指导:

我正在寻求关于Python**multiprocessing**模块或任何其他相关库中的替代方法或最佳实践的建议或见解,以实现包含自定义数据类型的列表的高效微秒级数据通信。

问题摘要:

在处理包含自定义数据类型的列表时,还有什么其他方法或途径可以在**multiprocessing模块中实现高速数据通信(微秒级)?之前使用QueuePipeShareableListSharedMemory**的尝试都失败了。如果您有任何意见或建议,我们将不胜感激。谢谢!

lqfhib0f

lqfhib0f1#

TL; DR -在开箱即用的Python中几乎不可能获得微秒级的计时。下面是一个使用SharedMemory的纯Python的可能的家庭解决方案,它接近并可能激发您自己的开发

Python中的微秒计时是一个相当残酷的目标。(插入关于在性能敏感的任务中使用Python的强制性咆哮)我认为专注于低延迟并允许一些错过的消息可以让我们接近你想要的。
这里的方法是双circular buffer。第一个缓冲区是固定步幅的,用于快速查找,并告诉您在另一个缓冲区中找到起始/停止点。由于没有提到固定的数据大小,因此很难用单个缓冲区完成您想要的任务,双循环缓冲区可以帮助您实现这一点,其中第一个缓冲区是固定步幅的(因此我们可以直接跳到感兴趣的数据),第二个是容纳数据的大小(但是我们可以直接跳到感兴趣的数据,因为第一个已经告诉我们它在哪里)。本质上,一个只是另一个中pickle对象的内存Map。在这个实现中,数据可以在被读取之前被覆盖,并且它支持任意数量的消费者。它可以被调整为在单个消费者中等待读取后覆盖,没有丢失消息配置。我相信这里有很多优化,给出了关于正在移动的数据的更好的知识。
我放了大量的评论来解释这个逻辑,但请随时在评论中提出后续问题。
我假设你的所有对象都是可picklable的,或者可以被设置为这样,因为你提到了multiprocessing.Queue,它已经有了这个要求。
您的要求:

  • 数据内容
  • 你提到的对象有一个__reduce__方法according to the docs,所以它应该可以正常工作,就像大多数常见的对象一样。如果它没有,你可以总是 Package 它,尽管我知道你没有“[额外]组织”的要求。
  • 性能目标
  • 对于大多数合理大小的数据库,它在5 - 20微秒范围内,并且可能在非常小的对象上做得更好,在我的测试中,单个int大约是1.2微秒。
  • 可用性
  • 这提供了与您测试的解决方案相似的对象,在我的测试中,我没有发现输入和输出之间的任何差异,尽管它确实有一个类 Package 在它周围,所以有一些相关的开销,这是不可避免的。

性能:在我的Windows 10,i9- 10900 K,2667 MHz RAM,Python 3.11.4上,我可以将1000 intnumpy.array的读取时间降低到大约4.8微秒,并且每个int读取大约1.2微秒。

import sys
import pickle
import multiprocessing.shared_memory as sm
import time
import uuid
import struct

# PyPi libraries
import numpy as np

# String encoding for storing in the buffer
ENCODING = 'UTF-8'

def getuuid():
    return bytes(str(uuid.uuid4()), encoding=ENCODING)

# GuyQianMessage
class GQM:
    '''
    A low read-latency message service between processes using multiprocessing
    Uses a circular buffer - older data is eventually lost, any process
        reading from this buffer which falls behind will miss entries
    DIR buffer contains UUID, index and start stop positions in the data buffer
    DAT buffer is the data buffer and contains UUID and pickle bytes
    
    Use `put` to put data into the buffer
    Use `get` to retrieve data from the buffer
    Use `get` with verbose=True to get back index and UUID for identifying 
        skipped or repeated data
    
    Typical use case:
    Proc 1:
        gqm = GQM(True)
        while True:
            # Generate data
            gqm.put(mydatatosend)
        # Exit time
        gqm.close()
    
    Proc 2:
        gqm = GQM()
        while True:
            data, index, uuid = gqm.get(verbose=True)
            if uuid == lastuuid:
                # repeat data
                pass
            if index > lastindex + 1:
                # missed data
                pass
            # process data
    
    '''
    
    # These are unique names for the memory space
    SHARED_MEMORY_NAME_DIR = 'SPECIAL_NAME_FOR_GUYQIAN_77500126_DIR'
    SHARED_MEMORY_NAME_DAT = 'SPECIAL_NAME_FOR_GUYQIAN_77500126_DAT'
    
    # Capture the length of a UUID (its stored in the buffer with the entry)
    UUID_LEN = len(getuuid())
    
    # How many positions to keep in the directory
    #   By checking indexes, processes can determine if they need to also 
    #   check an earlier position for data
    #   Eaching backwards improperly is caught by UUID matching
    DIR_POS_COUNT = 1024
    # The data type of the position in the DIR buffer
    DIR_POS_TYPE = 'L'
    # Pre calculate how much space the position takes up
    DIR_POS_LEN = struct.calcsize(f'<{DIR_POS_TYPE}')
    # Pre generate the format string for each entry in the DIR buffer
    DIR_ROW_FMT = f'<{UUID_LEN}sLLL'
    # Pre calculate the stride in the DIR buffer
    DIR_STRIDE = struct.calcsize(DIR_ROW_FMT)
    # Pre calculate the total DIR buffer size
    BUFFER_SIZE_DIR = struct.calcsize(
        f'<{DIR_POS_TYPE}{DIR_STRIDE * DIR_POS_COUNT}c'
    )
    
    # Set the size of the DAT buffer
    #   Larger holds more entries before overwriting
    #   May need to be high for large datatypes
    BUFFER_SIZE_DAT = int(1e6)
    
    def __init__(self, provider=False):
        '''
        Initialize with provider set to True for the data creator
        Initialize with no arguments or provider set to False for data users
        
        Returns a GQM object
        '''
        # Keep this flag - prevent consumers from trying to put data in
        self.provider = provider
        # If this is the provider, create the buffers using the precalculated
        #   sizes and names
        if self.provider:
            self.smo_dir = sm.SharedMemory(
                name=GQM.SHARED_MEMORY_NAME_DIR,
                create=True,
                size=GQM.BUFFER_SIZE_DIR
            )
            self.smo_dat = sm.SharedMemory(
                name=GQM.SHARED_MEMORY_NAME_DAT,
                create=True,
                size=GQM.BUFFER_SIZE_DAT
            )
        # If this is the consumer, hook into the existing buffer
        else:
            self.smo_dir = sm.SharedMemory(name=GQM.SHARED_MEMORY_NAME_DIR)
            self.smo_dat = sm.SharedMemory(name=GQM.SHARED_MEMORY_NAME_DAT)
        # This is the current byte position in the DAT buffer to write the 
        #   next item to
        self.dat_pos = 0
        # This is the current row numbre in the DIR buffer to write the 
        #   next look up info to
        self.dir_pos = 0
        # This is the index of the item being put in so that consumers can
        #   determine if they have missed any data
        self.index = 0
    
    def close(self):
        '''
        Call this function from the provider to close out the buffers
        DO NOT call this function before all consumers are finished
        
        Consumers do not need to do anything when finished
        '''
        # Clean up the buffers by unlinking
        # https://docs.python.org/3/library/multiprocessing.shared_memory.html
        if not self.provider:
            # Only the provider should call the close function
            raise AssertionError('This is not a provider! '\
                '- Unlink only from provider!')
        self.smo_dir.unlink()
        self.smo_dat.unlink()
    
    def put(self, obj):
        '''
        Add data to the buffer
        
        Returns
            index of the item added
            a UUID for the item added
        '''
        # Only providers should call put
        if not self.provider:
            raise AssertionError('Put called from consumer!')
        
        # Pickle the object
        #   If it fails, it may not be picklable, see error message below
        try:
            objp = pickle.dumps(obj)
        except TypeError:
            # Capture this error to help developers understand how to fix it
            raise TypeError('Most likely this error was thrown because an '\
                'object which cannot be pickled was passed as a message. '\
                '\n\tFor custom classes, add a __reduce__ method.' \
                '\n\tSee https://stackoverflow.com/questions/19855156'
            )
        except:
            # If something else happened raise the error as is
            raise
        
        # Calculate the amount of space necessary in the DAT buffer
        space_req = GQM.UUID_LEN + len(objp)
        # Check if there is that much left
        #   If not, reset to the beginning of the buffer
        #   Relies on the UUID's to stop improper reads
        #   This makes it a circular buffer
        if self.dat_pos + space_req > GQM.BUFFER_SIZE_DAT:
            self.dat_pos = 0
        # The bytes of the block to write the data to
        start = self.dat_pos
        stop = self.dat_pos + space_req
        # Generate a new UUID to use for this entry so that it can be 
        #   uniquely identified
        uuid = getuuid()
        
        # Put the data into the DAT buffer first
        self.smo_dat.buf[start:stop] = struct.pack(
            f'<{space_req}s',
            uuid + objp
        )
        
        # Next put the data into the DIR buffer
        self._put_dir_row(self.dir_pos, uuid, self.index, start, stop)
        # Lastly update the current entry marker for the DIR buffer
        self._put_dir_pos(self.dir_pos)
        
        # Increment the index to track how many items have been put into the
        #   buffer
        self.index += 1
        # Increment the dir_pos for the next call and wrap on the total length
        #   to fulfill circular buffer property
        self.dir_pos += 1
        self.dir_pos %= GQM.DIR_POS_COUNT
        # Record the next position to start writing to the DAT buffer
        self.dat_pos = stop
        # Return the index and UUID in case they are to be used for something
        return self.index, uuid
    
    def get(self, verbose=False, pos_override=None):
        '''
        Retrieves the latest data from the buffer
        To check if data was skipped or is repeated, check index and uuid
        
        Returns
            The data object
            index of this entry (if verbose)
            uuid of this entry (if verbose)
        
        '''
        # Allow a pos override for testing or advanced usage
        if pos_override is None:
            pos = self._get_dir_pos()
        else:
            # Could add some logic for negatives to allow retrieving previous
            #   entries blindly without checking positions and indexes, etc
            pos = pos_override
        
        # Retrieve the information from the DIR buffer
        uuid_dir, index, start, end = self._get_dir_row(pos)
        # Retrieve the data from the DAT buffer
        uuid_dat, pickledata = struct.unpack(
            f'<{GQM.UUID_LEN}s{end - start - GQM.UUID_LEN}s',
            self.smo_dat.buf[start: end]
        )
        # Verify that the UUID's match
        #   If they don't match the DAT could have been updated while we were
        #   reading the DIR buffer 
        #   It should be possible to resolve by simply recalling get
        # If this triggers often, the DAT buffer may be too small
        if not uuid_dat == uuid_dir:
            raise AssertionError('Failed to retrieve matching UUID from data')
        # Read the object back
        outobject = pickle.loads(pickledata)
        # Return the object (and index and UUID if requested)
        if verbose:
            return outobject, index, uuid_dat
        return outobject
    
    def _get_indices_dir(self, pos):
        # Get the indices of the DIR buffer entry
        #   The position indicator is first, and then pos strides is the start
        #   of that entry in the buffer
        start = GQM.DIR_POS_LEN + GQM.DIR_STRIDE * pos
        # Because the DIR buffer is fixed strides, we can just add the stride
        #   to the start to get the end
        return start, start + GQM.DIR_STRIDE
    
    def _put_dir_row(self, pos, uuid, index, start, stop):
        # Put data into a row in the DIR buffer (reverse of _get_dir_row)
        # Get the start and stop positions
        startdir, stopdir = self._get_indices_dir(pos)
        # Put the data in using the pre-generated fmt string
        self.smo_dir.buf[startdir: stopdir] = struct.pack(
            GQM.DIR_ROW_FMT,
            uuid,
            index,
            start,
            stop
        )
    
    def _get_dir_row(self, pos=None):
        # Get a row from the DIR buffer (reverse of _put_dir_row)
        # Allow position overrides for testing or advanced usage
        if pos is None:
            pos = self._get_dir_pos()
        # Get the start and stop positions in the DIR buffer
        start, stop = self._get_indices_dir(pos)
        # Retrieve and unpack the data using the pre-generated fmt string
        uuid, index, startdat, stopdat = struct.unpack(
            GQM.DIR_ROW_FMT,
            self.smo_dir.buf[start: stop]
        )
        # Return all the data collectd
        return uuid, index, startdat, stopdat
    
    def _put_dir_pos(self, pos):
        # Update the current DIR buffer position (reverse of _get_dir_pos)
        self.smo_dir.buf[0: GQM.DIR_POS_LEN] = struct.pack(
            f'<{GQM.DIR_POS_TYPE}',
            pos
        )
    
    def _get_dir_pos(self):
        # Retrieve teh current DIR buffer position (reverse of _put_dir_pos)
        next_pos = struct.unpack(
            f'<{GQM.DIR_POS_TYPE}',
            self.smo_dir.buf[0: GQM.DIR_POS_LEN]
        )
        # Struct unpack returns a tuple even for a single item in the fmt str
        return next_pos[0]

def gt(s=0):
    # A convenience function for returning times for rough performance calc
    return time.perf_counter() * 1e6 - s

def _demo():
    gqmp = GQM(True)
    gqmp2 = GQM()
    
    # A variety of data types to demonstrate that it can recover the object
    test_data = [
        1,
        'abc',
        b'/x00/x01',
        {'abc': 10},
        list(range(10)),
        np.array(range(100, 1100), int),
        [{'a': [{'b': {'c': [1, 'a',]}}]}]
    ]
    
    for test_point in test_data:
        gqmp.put(test_point)
        x = gqmp2.get()
        assert(pickle.dumps(x) == pickle.dumps(test_point))
    
    print('Example get: ', gqmp2.get(verbose=True))
    
    gqmp.put(np.array(range(1000), int))
    n = int(1e5)
    s = gt()
    for i in range(n):
        gqmp2.get()
    unit_time = gt(s) / n
    print(f'get time: {unit_time:.2f} u-sec')
    # Approx 4.8 u-sec per read
    
    gqmp.close()

def _test():
    
    # Check if u-sec write/read is even feasible
    n = int(1e5)
    mydata = np.array(range(100), int)
    s = gt()
    for i in range(n):
        mypick = pickle.dumps(mydata)
        mynewdata = pickle.loads(mypick)
    unit_time = gt(s) / n # ~8.1
    print(f'Pickle/Unpickle time: {unit_time:.2f} u-sec')
    # Not really , approx 8.1 u-sec per write-read
    
    # Check if u-sec read only is feasible
    n = int(1e5)
    mydata = np.array(range(100), int)
    mypick = pickle.dumps(mydata)
    s = gt()
    for i in range(n):
        mynewdata = pickle.loads(mypick)
    unit_time = gt(s) / n # ~2.5
    print(f'Unpickle time:        {unit_time:.2f} u-sec')
    # Maybe , approx 2.5 u-sec per read

if __name__ == '__main__':
    if sys.argv[1] == 'test':
        _test()
    elif sys.argv[1] == 'demo':
        _demo()

字符串
这里有一个小图,简要显示了内存是如何组织的。How the memory is organized

相关问题