C语言 Python套接字接收过时的数据,因为主线程time.sleep()ing

k7fdbhmy  于 2023-08-03  发布在  Python
关注(0)|答案(1)|浏览(102)

我正在从socket接收数据。我用socket.recv(size)连接到socket并在while循环中接收特定大小的数据。
在高接收频率下,所有工作正常。但是,如果我在while循环中添加一个time.sleep(sec),我开始每次都收到过时的值。看起来像套接字充满了旧的数据,只要主机发送数据高达0.002次在一秒钟内,我只能接收过时的数据与我的1秒的接收频率(作为例子)。我不能从socket共享数据(只要它对于post来说太宽太重),但下面是代码:

import ctypes
import datetime
import logging
import socket
import time

from app.service.c_structures import RTDStructure
logging.basicConfig(level=logging.DEBUG)

class RTDSerializer:
    def __init__(self, ip: str, port: int = 29000, frequency: float = 0.002):
        self.data: dict = {}
        self.ip = ip
        self.port = port
        self.frequency = frequency
        self.sock = None
        self.struct_size = ctypes.sizeof(RTDStructure)
        self.logger = logging
        print(ctypes.sizeof(RTDStructure))

    def connect(self):
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.connect((self.ip, self.port))
            self.sock.settimeout(None)
            logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
            while True:
                c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
                self.data = self.ctypes_to_dict(c_structure)
                print(datetime.datetime.now(), self.data['move_des_q'])
                time.sleep(self.frequency)
        except Exception as error:
            logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
            return 0

    def receive_raw_data(self) -> bytes or connect:
        raw_data = self.sock.recv(self.struct_size)
        if raw_data == b'':
            logging.error('Connection lost')
            return self.connect()
        return raw_data

    def ctypes_to_dict(self, ctypes_obj) -> dict or list:
        if isinstance(ctypes_obj, ctypes.Structure):
            data_dict = {}
            for field_name, field_type in ctypes_obj.get_fields():
                field_value = getattr(ctypes_obj, field_name)
                if isinstance(field_value, (ctypes.Structure, ctypes.Array)):
                    data_dict[field_name] = self.ctypes_to_dict(field_value)
                else:
                    data_dict[field_name] = field_value
            return data_dict
        elif isinstance(ctypes_obj, ctypes.Array):
            data_list = []
            for element in ctypes_obj:
                if isinstance(element, (ctypes.Structure, ctypes.Array)):
                    data_list.append(self.ctypes_to_dict(element))
                else:
                    data_list.append(element)
            return data_list

if __name__ == '__main__':

    rtd = RTDSerializer(ip='192.168.0.226', port=29000, frequency=0.05)
    rtd.connect()

字符串
我从socket接收数据。它是一个C值的字节串。我用ctypes结构序列化它并转换成字典。序列化算法对于本主题并不重要。
此外,套接字有时返回0字节,所以,我需要在receive_raw_data func中检查这个问题。
我尝试强制重新连接while循环。这解决了过时信息的问题。就像这样:

def connect(self):
        try:
            logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
            while True:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect((self.ip, self.port))
                self.sock.settimeout(None)
                c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
                self.data = self.ctypes_to_dict(c_structure)
                print(datetime.datetime.now(), self.data['move_des_q'])
                time.sleep(self.frequency)
                self.sock.close()
        except Exception as error:
            logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
            return 0


但是,当我使用非常高的接收频率时,它会影响工作速度。就主机发送频率而言,这非常重要。
那么,我该如何解决这个问题呢?
套接字断开有什么问题?
为什么套接字会被填满而不通过过时的数据?
我正在考虑clear_buffer函数,它将删除所有未使用的数据。当线程正在睡眠时,但是,这会有点不合理的复杂...有什么想法吗

UPD:也许我应该写一个打印线程,每次只唤醒一次,通过frequency瓦尔设置,打印data_buffer的当前值,然后用socket填充,然后再次休眠,同时socket线程会高频率运行,覆盖这个data_buffer值?
新的小UPD与微观的例子:

import ctypes
import datetime
import logging
import socket
import time

logging.basicConfig(level=logging.DEBUG)

class RTDReceiver:
    def __init__(self, ip: str, port: int, frequency: float = 0.002):
        self.data: dict = {}
        self.ip = ip
        self.port = port
        self.frequency = frequency
        self.sock = None
        self.struct_size = 1064  # 1064 bytes in my case. 

    def connect(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((self.ip, self.port))
        self.sock.settimeout(None)

    def receive_raw_data(self) -> bytes:
        raw_data = self.sock.recv(self.struct_size)
        if raw_data == b'':
            logging.error('Connection lost')
            self.connect()
        time.sleep(self.frequency)
        return raw_data

if __name__ == '__main__':

    rec = RTDReceiver(ip='here your ip', port='here is your port',  frequency=0.02)
    rec.connect()
    while True:
        print(rec.receive_raw_data())

6ie5vjzr

6ie5vjzr1#

为什么套接字会被填满而不通过过时的数据?
因为这就是套接字的工作方式。尤其是像您正在使用的面向流的套接字。按发送顺序接收字节。在读取后面的字节之前,必须先读取前面的字节。
你说
只要主机每秒发送数据达0.002次
,但这将是一个非常低的数据速率,除非您的RTDStructure确实非常大。我猜你的意思是主机发送数据的频率为每0.002秒一次==每秒500次。不管是哪种,如果你想读取最新的数据,接收者需要跟上发送者。
套接字断开有什么问题?
这很难确定,但我推测接收方远远福尔斯于发送方,以至于套接字的接收缓冲区填满,于是机器停止接受来自发送方的新数据包。虽然这本身不会导致连接关闭,但如果它持续足够长的时间(当然可以),那么发送方很可能会从其末端关闭连接。
此外,套接字有时返回0字节,所以,我需要在receive_raw_data func中检查这个问题。
不,你有。在一个调用中接收的字节数也可能比您请求的要少,因此为了健壮性,您需要提供通过两个或多个recv()调用接收传输的功能。这就是流套接字的工作方式。
那么,我该如何解决这个问题呢?
有以下几种可能性:

  • 尽可能快地接收消息,也许在专用于此的线程中。从那些 * 在客户端 *,在任何频率,你想要的,丢弃未选中的。
  • 使服务器的数据速率可在每个连接的基础上配置。客户端连接,告诉服务器发送数据的频率,然后以该频率接收数据。
  • 更改消息交换模式。您目前有一个特别简单的方法,客户端只需要连接,然后服务器就可以以它选择的任何速率推送数据,而不需要来自客户端的任何进一步消息。如果切换到请求/响应模式,客户端必须向服务器发送某种消息(可能只是一个字节),以便服务器发送数据,那么服务器就不能轻松地超过客户端。但是,这需要对服务器进行最多的更改,并且请求延迟可能会降低您可以实现的最大数据速率。
  • 作为前一种的变化,客户机可以在每次想要读取项目时建立新的连接,然后在仅阅读一个项目之后断开连接。* 可能 * 每次都会获得新的数据,但它会增加更多的延迟。

相关问题