possibility使用kafka连接器从python opcua客户端获取数据到kafka主题

t9eec4r0  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(443)

有什么是 python opcua client . datachange_notification(self, node, val, data) 将作为 python opcua client 无论何时 client ,请参阅下面的脚本,是否可以发送到达的数据(在此上下文中是 val )Kafka的主题。

def datachange_notification(self, node, val, data):
        saveData = save_file.Savefile()
        #save data locally when data arrived according to the nodeid=2
        if str(node) == "Node(NumericNodeId(ns=2;i=2))":
            if val != None and val != 0:
                print("Python: New data change event", node, val)
                saveData.saveBlockByClient(val)
        #save data locally when data arrived according to the nodeid=3
        if str(node) == "Node(NumericNodeId(ns=2;i=3))":
            if val != None and val != 0:
                print("Python: New data change event", node, val)
                saveData.saveSourceByValidator(str(val))
nkkqxpd9

nkkqxpd91#

请尝试它是否适用于kafka python包-

from kafka import KafkaProducer

class KafkaOpcUaSubHandler(object): 
    def __init__(self):
        self.msg={}

    def connect_kafka(self, server, port):
        kafka_producer = KafkaProducer(
                bootstrap_server ='{}:{}'.format(server, port),
                retries=0, batch_size=0, compression_type=None
            )

    def kafka_message_producer(self, kafka_topic, msg):
        kafka_producer = self.connect_kafka(kafka_server, kafka_port)
        your_kafka_message = kafka_producer
                             .send(kafka_topic, json.dumps(msg, default=str)
                             .encode('utf-8'))
        kafka_producer.flush()

    def datachange_notification(self, node, val, data):
       self.your_function(node, val, self.msg, data)

    def your_function(self, node, val, msg, data):
        payload = []
        payload = [val, str(data.monitored_item.Value.SourceTimestamp)]
        self.kafka_message_producer(kafka_topic, payload)
xyhw6mcr

xyhw6mcr2#

有可能吗?当然。
导入一个kafka库并新建一个producer示例,然后发送一条消息
如果您使用评论中提到的asyncio客户机,我建议您研究aiokafka python库

相关问题