我正在从socket接收数据。我用socket.recv(size)
连接到socket并在while
循环中接收特定大小的数据。
在高接收频率下,所有工作正常。但是,如果我在while
循环中添加一个time.sleep(sec)
,我开始每次都收到过时的值。看起来像套接字充满了旧的数据,只要主机发送数据高达0.002次在一秒钟内,我只能接收过时的数据与我的1秒的接收频率(作为例子)。我不能从socket共享数据(只要它对于post来说太宽太重),但下面是代码:
import ctypes
import datetime
import logging
import socket
import time
from app.service.c_structures import RTDStructure
logging.basicConfig(level=logging.DEBUG)
class RTDSerializer:
def __init__(self, ip: str, port: int = 29000, frequency: float = 0.002):
self.data: dict = {}
self.ip = ip
self.port = port
self.frequency = frequency
self.sock = None
self.struct_size = ctypes.sizeof(RTDStructure)
self.logger = logging
print(ctypes.sizeof(RTDStructure))
def connect(self):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
self.sock.settimeout(None)
logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
while True:
c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
self.data = self.ctypes_to_dict(c_structure)
print(datetime.datetime.now(), self.data['move_des_q'])
time.sleep(self.frequency)
except Exception as error:
logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
return 0
def receive_raw_data(self) -> bytes or connect:
raw_data = self.sock.recv(self.struct_size)
if raw_data == b'':
logging.error('Connection lost')
return self.connect()
return raw_data
def ctypes_to_dict(self, ctypes_obj) -> dict or list:
if isinstance(ctypes_obj, ctypes.Structure):
data_dict = {}
for field_name, field_type in ctypes_obj.get_fields():
field_value = getattr(ctypes_obj, field_name)
if isinstance(field_value, (ctypes.Structure, ctypes.Array)):
data_dict[field_name] = self.ctypes_to_dict(field_value)
else:
data_dict[field_name] = field_value
return data_dict
elif isinstance(ctypes_obj, ctypes.Array):
data_list = []
for element in ctypes_obj:
if isinstance(element, (ctypes.Structure, ctypes.Array)):
data_list.append(self.ctypes_to_dict(element))
else:
data_list.append(element)
return data_list
if __name__ == '__main__':
rtd = RTDSerializer(ip='192.168.0.226', port=29000, frequency=0.05)
rtd.connect()
字符串
我从socket接收数据。它是一个C值的字节串。我用ctypes结构序列化它并转换成字典。序列化算法对于本主题并不重要。
此外,套接字有时返回0字节,所以,我需要在receive_raw_data
func中检查这个问题。
我尝试强制重新连接while
循环。这解决了过时信息的问题。就像这样:
def connect(self):
try:
logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
while True:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
self.sock.settimeout(None)
c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
self.data = self.ctypes_to_dict(c_structure)
print(datetime.datetime.now(), self.data['move_des_q'])
time.sleep(self.frequency)
self.sock.close()
except Exception as error:
logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
return 0
型
但是,当我使用非常高的接收频率时,它会影响工作速度。就主机发送频率而言,这非常重要。
那么,我该如何解决这个问题呢?
套接字断开有什么问题?
为什么套接字会被填满而不通过过时的数据?
我正在考虑clear_buffer
函数,它将删除所有未使用的数据。当线程正在睡眠时,但是,这会有点不合理的复杂...有什么想法吗
UPD:也许我应该写一个打印线程,每次只唤醒一次,通过frequency瓦尔设置,打印data_buffer
的当前值,然后用socket填充,然后再次休眠,同时socket线程会高频率运行,覆盖这个data_buffer
值?
新的小UPD与微观的例子:
import ctypes
import datetime
import logging
import socket
import time
logging.basicConfig(level=logging.DEBUG)
class RTDReceiver:
def __init__(self, ip: str, port: int, frequency: float = 0.002):
self.data: dict = {}
self.ip = ip
self.port = port
self.frequency = frequency
self.sock = None
self.struct_size = 1064 # 1064 bytes in my case.
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
self.sock.settimeout(None)
def receive_raw_data(self) -> bytes:
raw_data = self.sock.recv(self.struct_size)
if raw_data == b'':
logging.error('Connection lost')
self.connect()
time.sleep(self.frequency)
return raw_data
if __name__ == '__main__':
rec = RTDReceiver(ip='here your ip', port='here is your port', frequency=0.02)
rec.connect()
while True:
print(rec.receive_raw_data())
型
1条答案
按热度按时间6ie5vjzr1#
为什么套接字会被填满而不通过过时的数据?
因为这就是套接字的工作方式。尤其是像您正在使用的面向流的套接字。按发送顺序接收字节。在读取后面的字节之前,必须先读取前面的字节。
你说
只要主机每秒发送数据达0.002次
,但这将是一个非常低的数据速率,除非您的
RTDStructure
确实非常大。我猜你的意思是主机发送数据的频率为每0.002秒一次==每秒500次。不管是哪种,如果你想读取最新的数据,接收者需要跟上发送者。套接字断开有什么问题?
这很难确定,但我推测接收方远远福尔斯于发送方,以至于套接字的接收缓冲区填满,于是机器停止接受来自发送方的新数据包。虽然这本身不会导致连接关闭,但如果它持续足够长的时间(当然可以),那么发送方很可能会从其末端关闭连接。
此外,套接字有时返回0字节,所以,我需要在
receive_raw_data
func中检查这个问题。不,你有。在一个调用中接收的字节数也可能比您请求的要少,因此为了健壮性,您需要提供通过两个或多个
recv()
调用接收传输的功能。这就是流套接字的工作方式。那么,我该如何解决这个问题呢?
有以下几种可能性: