confluentkafka:当流式传输网络摄像头提要时,kafka消费者的React迟钝

ev7lccsx  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(333)

我正在将我的网络摄像头feed流式传输给kakfa消费者,以便进行面部情绪分析并将其存储到db。
不过,我注意到消费者方面有相当大的滞后(3秒)。在解决延迟的过程中,降低了生产者端的消息/图像质量。然而,这并没有改善延迟,并且进一步恶化了用户端的图像质量。
Kafka制作人代码如下:

from confluent_kafka import Producer
import cv2
import time, sys

p = Producer({'bootstrap.servers': 'localhost'})

encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

camera = cv2.VideoCapture(0)

try:
    while (True):
        success, frame = camera.read()
        frame = cv2.resize(frame, None, fx=0.3, fy=0.3)
        result, buffer = cv2.imencode('.jpg', frame, encode_param)
        #ret, buffer = cv2.imencode('.jpg', frame)

        p.poll(0)
            p.produce('newtopic', buffer.tobytes(), callback=delivery_report)

        print('sent')
        p.flush()
        time.sleep(0.1)
except:
    print("\nExiting.")
    sys.exit(1)

以下是Kafka消费者的代码:

import cv2
import sys
import os
import imutils
import logging

from confluent_kafka import Consumer, KafkaError
from confluent_kafka.admin import AdminClient

def example_delete_topics(a, topics):
        fs = a.delete_topics(topics, operation_timeout=30)

        # Wait for operation to finish.
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                print("Topic {} deleted".format(topic))
            except Exception as e:
                print("Failed to delete topic {}: {}".format(topic, e))
a = AdminClient({'bootstrap.servers': 'localhost'})
example_delete_topics(a, ['newtopic'])
c = Consumer({
    'bootstrap.servers': 'localhost',
    'group.id': 'mygroup',
    'auto.offset.reset': 'latest'
})
camID = 0
c.subscribe(['newtopic'])
(H, W) = (None, None)

while True:
    msg = c.poll(1.0)

    if msg is None:
        print ("No Message")
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print ("Received")
    img_bytes = msg.value()
    frame = np.array(Image.open(io.BytesIO(img_bytes)))
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = imutils.resize(frame, width=800)

    if W is None or H is None:
        (H, W) = frame.shape[:2]
    #print (H,W)
    try:
        rects, landmarks = face_detect.detect_face(frame,20)
        rects_tracker = []

        for (i,rect) in enumerate(rects):

            # Intensive calculations on sentiment analysis

            draw_border(frame,(rect[0],rect[1]),(rect[0] + rect[2],rect[1]+rect[3]),(127,255,255),1,10,20)
            cv2.putText(frame,final_result,(rect[0],rect[1]),cv2.FONT_HERSHEY_SIMPLEX,0.5,(127,255,255),1,cv2.LINE_AA)

    except Exception as e :
        logging.info('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
        pass

    cv2.imshow('Sample', frame)
    key = cv2.waitKey(20)
    if key == 27:  # exit on ESC
        end = time.time()
        seconds = end - start
        fps  = frame_counter / seconds;
        print ("Estimated frames per second : {0}".format(fps))
                    break

c.close()

尽管如此,用户端的代码非常繁重,需要很多步骤。在没有Kafka的直接网络摄像头上,每帧需要150毫秒的时间。但对于Kafka来说,大约需要3秒钟。
有人能帮我把它减少到一个很大的程度,使接近实时吗

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题