python使用redis订阅发布来传输大粒度的数据

ni65a41a  于 2021-06-09  发布在  Redis
关注(0)|答案(0)|浏览(206)

运行时环境。

macOS
redis-5.05

python-3.7.6
redis-3.4.1

redis配置信息。

127.0.0.1:6379> info memory

# Memory

used_memory:1052688
used_memory_human:1.00M
used_memory_rss:2084864
used_memory_rss_human:1.99M
used_memory_peak:135268640
used_memory_peak_human:129.00M
used_memory_peak_perc:0.78%
used_memory_overhead:1037646
used_memory_startup:987952
used_memory_dataset:15042
used_memory_dataset_perc:23.24%
allocator_allocated:1007568
allocator_active:2046976
allocator_resident:2046976
total_system_memory:8589934592
total_system_memory_human:8.00G
used_memory_lua:37888
used_memory_lua_human:37.00K
used_memory_scripts:0
used_memory_scripts_human:0B
number_of_cached_scripts:0
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
allocator_frag_ratio:2.03
allocator_frag_bytes:1039408
allocator_rss_ratio:1.00
allocator_rss_bytes:0
rss_overhead_ratio:1.02
rss_overhead_bytes:37888
mem_fragmentation_ratio:2.07
mem_fragmentation_bytes:1077296
mem_not_counted_for_evict:0
mem_replication_backlog:0
mem_clients_slaves:0
mem_clients_normal:49694
mem_aof_buffer:0
mem_allocator:libc
active_defrag_running:0
lazyfree_pending_objects:0

程序代码。


# - sub.py -

import redis
import numpy as np
rc = redis.StrictRedis()
ps = rc.pubsub()

def my_handler(data):
    arr_data = np.frombuffer(data["data"], dtype='u4')
    data.update({"data": len(arr_data)})
    print(data)

ps.subscribe(**{"demo": my_handler})
ps.run_in_thread()

# -- pub.py --

import redis
import time
import numpy

# data = numpy.arange(1024**2, dtype="u4")

data = numpy.arange(1024**2*16, dtype="u4")
rc = redis.StrictRedis()
for i in range(100):
    time.sleep(0.5)
    print(rc.publish("demo", data.tobytes()))

当我用它传输1mb的数据时,一切正常,当大小为64mb时,它有以下信息和异常信息。


# - sub.py -

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/client.py", line 3667, in run
    timeout=sleep_time)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/client.py", line 3565, in get_message
    response = self.parse_response(block=False, timeout=timeout)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/client.py", line 3451, in parse_response
    if not block and not conn.can_read(timeout=timeout):
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 729, in can_read
    return self._parser.can_read(timeout)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 313, in can_read
    return self._buffer and self._buffer.can_read(timeout)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 223, in can_read
    raise_on_timeout=False)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 193, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.

# - pub.py -

1
0
0
...

我需要在示例化时添加一些参数来解决它吗?期待回复。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题