Python从其内存地址检索数据

7xzttuei  于 2023-11-20  发布在  Python
关注(0)|答案(1)|浏览(93)

我目前有一个非常大的数据集(~15 GB),应该同时由多个Python脚本使用。
加载数据集的单个元素需要太长时间,因此整个数据集必须在内存中。
我目前最好的方法是让一个python进程加载所有元素,并让其他python脚本通过一个到localhost的套接字连接请求元素。
然而,话又说回来,我有编码和解码通过套接字发送的数据的问题.所以我的下一个最好的想法是以下,然而,我不知道这是否是可能的:
1.让脚本通过套接字向拥有数据的进程发送一个请求,该请求包含它们想要使用的数据集中数据点的索引
1.让拥有数据的进程返回数据点的内存地址
1.让脚本从那里加载元素
所以问题是这是否可能?
问候

lf5gs5x2

lf5gs5x21#

我们这里的细节有点稀疏,但这里有一个符合您要求的解决方案。我假设您的数据近似于表格(如CSV或pandas.DataFrame),并且每个字段中的数据类型一致。
为了使用这个,

  • 更新DATA_FIELDSSTRUCT_FMT_STRENCODINGSTR_WIDTH信息以匹配数据
  • 更新提供程序以加载您的数据,而不是我创建的玩具数据
  • 运行提供程序并等待,直到它标记为“就绪”
  • 通过使用consumer参数重新执行脚本,启动任意数量的consumer。

如果你正在做大量的测试(比如编辑脚本,运行它,检查结果,编辑脚本检查,等等),你可以启动一个提供者,然后用消费者重复运行你的测试,而不必重新加载数据,因为只要提供者仍然活着,消费者就可以重复启动和死亡。
如果你有很多字符串和/或你的字符串可以真的很长,这可能会浪费大量的空间,你可能要考虑单独存储字符串,以节省保存空间。
如果你有任何问题,请告诉我,我试着提出很多评论来帮助解释逻辑,希望它能有所帮助。

# Default libraries
import multiprocessing.shared_memory as sm
import sys
import uuid
import struct

# PyPi libraries
import numpy as np
import pandas as pd # Used only for convenience

# How wide can your strings get?
STR_WIDTH = 1024
# Data set size used only for generating fake data
DATA_SET_SIZE = int(1e6)

# A unique name that other processes can find the memory by
SHARED_MEMORY_NAME = 'SPECIAL_NAME_FOR_QUASI_77500330'
# Some example data, update yours to match the data you want to process
DATA_FIELDS = [
    ('id', str),
    ('num1', int),
    ('num2', float),
    ('num3', float),
    ('num4', float)
]
# A format string which matches the data above
STRUCT_FMT_STR = f'<{STR_WIDTH}sqddd'
# Calculate the stride
STRUCT_SIZE = struct.calcsize(STRUCT_FMT_STR)
# Will store the total number of samples for convenience
MEM_SIZE_FMT_STR = '<q'
MEM_SIZE_SIZE = struct.calcsize(MEM_SIZE_FMT_STR)
# What encoding to store strings in
#   (if you change this, update struct format string too)
ENCODING = 'UTF-8'
# The path to your dataset or whatever, I used CSV for convenience
FPATH = '77500330_tmp.csv'

# This function puts data into the shared memory buffer
def put(sm, pos, *argc):
    # Strings have to be stored as bytes strings, so convert those
    outargs = [
        bytes(x, encoding=ENCODING) if type(x) is str else x for x in argc
    ]
    
    # Slot the data into the buffer
    posa = MEM_SIZE_SIZE + pos * STRUCT_SIZE
    posb = MEM_SIZE_SIZE + (pos + 1) * STRUCT_SIZE
    sm.buf[posa: posb] = struct.pack(STRUCT_FMT_STR, *outargs)

# This function gets data from the shared memory buffer
def get(sm, pos):
    # Retrieve the data from the buffer
    posa = MEM_SIZE_SIZE + pos * STRUCT_SIZE
    posb = MEM_SIZE_SIZE + (pos + 1) * STRUCT_SIZE
    res = struct.unpack(STRUCT_FMT_STR, sm.buf[posa: posb])
    # Strings from bytes (see put)
    outval = [
        str(x.rstrip(b'\x00'), encoding=ENCODING) if type(x) is bytes else x
        for x in res
    ]
    return outval

# This function stores the number of samples in the buffer
#   makes it easier for consumers to know how much there is
def putsize(sm, total_samples):
    size = struct.pack(MEM_SIZE_FMT_STR, total_samples)
    sm.buf[0: MEM_SIZE_SIZE] = size

# This function retrieves the number of samples in the buffer
def getsize(sm):
    size = struct.unpack(MEM_SIZE_FMT_STR, sm.buf[0:struct.calcsize('<q')])
    return size[0]

# This is a function that makes data for my testing
#   you don't need it after you learn to get the provider/consumer working
def makedata():
    print('makedata')
    
    rng = np.random.default_rng(seed=42)
    
    id = [str(uuid.uuid4()) for i in range(DATA_SET_SIZE)]
    num1 = rng.integers(0, 1e3, size=DATA_SET_SIZE)
    num2 = rng.random(size=DATA_SET_SIZE)
    num3 = rng.random(size=DATA_SET_SIZE)
    num4 = rng.random(size=DATA_SET_SIZE)
    
    data = pd.DataFrame(
        {'id': id, 'num1': num1, 'num2': num2, 'num3': num3, 'num4': num4},
        columns=[x[0] for x in DATA_FIELDS])
    data = data.astype({x[0]: x[1] for x in DATA_FIELDS})
    
    data.to_csv(FPATH)

# This function acts as the provider for the data
#   It must live as long as any consumer still needs data
#   Consumers can be started at any point after this reports 'ready'
def provider():
    print('provider started')
    # Replace this line with however you read your data
    data = pd.read_csv(FPATH, index_col=0)
    # This calculates the total size of the memory buffer to allocate
    total_size = MEM_SIZE_SIZE + len(data) * STRUCT_SIZE
    # Create the shared memory buffer
    shared_memory_block = sm.SharedMemory(name=SHARED_MEMORY_NAME,
        create=True, size=total_size)
    # Store the number of samples in the buffer
    putsize(shared_memory_block, len(data))
    # Store the data in the buffer
    for i in range(len(data)):
        put(shared_memory_block, i, *data.loc[i].values)
    # Wait
    print('ready - press ctrl-c to stop')
    try:
        input()
    except KeyboardInterrupt:
        pass
    except:
        raise
    # When all processes are done, unlink the memory
    print('exiting ...')
    shared_memory_block.unlink()
    print('done!')
    # A testing block to demonstrate that data gets in and out
    #   All three lines should be exactly the same
    if True:
        smb2 = sm.SharedMemory(name=SHARED_MEMORY_NAME)
        print(list(data.loc[8].values))
        print(get(shared_memory_block, 8))
        print(get(smb2, 8))

# This function acts as the consumer of the data
#   Many of these consumers can exist at the same time
#   Consumers can also be started and stopped many times for a single provider
def consumer():
    print('consumer started')
    # Link to the shared memory block created by the producer
    #   If this line fails, check that the producer is in the 'ready' state
    shared_memory_block = sm.SharedMemory(name=SHARED_MEMORY_NAME)
    # Find out how many samples are available in the buffer
    total_samples = getsize(shared_memory_block)
    # Iterate through them, doing whatever it is that you need to do
    for i in range(total_samples):
        # Simply call 'get' on each index to retrieve the data
        mydata = get(shared_memory_block, i)
        print(mydata)

if __name__ == '__main__':
    if sys.argv[1] == 'provider':
        provider()
    if sys.argv[1] == 'consumer':
        consumer()
    if sys.argv[1] == 'makedata':
        makedata()

字符串

相关问题