我正在将我的网络摄像头feed流式传输给kakfa消费者,以便进行面部情绪分析并将其存储到db。
不过,我注意到消费者方面有相当大的滞后(3秒)。在解决延迟的过程中,降低了生产者端的消息/图像质量。然而,这并没有改善延迟,并且进一步恶化了用户端的图像质量。
Kafka制作人代码如下:
from confluent_kafka import Producer
import cv2
import time, sys
p = Producer({'bootstrap.servers': 'localhost'})
encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
camera = cv2.VideoCapture(0)
try:
while (True):
success, frame = camera.read()
frame = cv2.resize(frame, None, fx=0.3, fy=0.3)
result, buffer = cv2.imencode('.jpg', frame, encode_param)
#ret, buffer = cv2.imencode('.jpg', frame)
p.poll(0)
p.produce('newtopic', buffer.tobytes(), callback=delivery_report)
print('sent')
p.flush()
time.sleep(0.1)
except:
print("\nExiting.")
sys.exit(1)
以下是Kafka消费者的代码:
import cv2
import sys
import os
import imutils
import logging
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.admin import AdminClient
def example_delete_topics(a, topics):
fs = a.delete_topics(topics, operation_timeout=30)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} deleted".format(topic))
except Exception as e:
print("Failed to delete topic {}: {}".format(topic, e))
a = AdminClient({'bootstrap.servers': 'localhost'})
example_delete_topics(a, ['newtopic'])
c = Consumer({
'bootstrap.servers': 'localhost',
'group.id': 'mygroup',
'auto.offset.reset': 'latest'
})
camID = 0
c.subscribe(['newtopic'])
(H, W) = (None, None)
while True:
msg = c.poll(1.0)
if msg is None:
print ("No Message")
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print ("Received")
img_bytes = msg.value()
frame = np.array(Image.open(io.BytesIO(img_bytes)))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = imutils.resize(frame, width=800)
if W is None or H is None:
(H, W) = frame.shape[:2]
#print (H,W)
try:
rects, landmarks = face_detect.detect_face(frame,20)
rects_tracker = []
for (i,rect) in enumerate(rects):
# Intensive calculations on sentiment analysis
draw_border(frame,(rect[0],rect[1]),(rect[0] + rect[2],rect[1]+rect[3]),(127,255,255),1,10,20)
cv2.putText(frame,final_result,(rect[0],rect[1]),cv2.FONT_HERSHEY_SIMPLEX,0.5,(127,255,255),1,cv2.LINE_AA)
except Exception as e :
logging.info('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
pass
cv2.imshow('Sample', frame)
key = cv2.waitKey(20)
if key == 27: # exit on ESC
end = time.time()
seconds = end - start
fps = frame_counter / seconds;
print ("Estimated frames per second : {0}".format(fps))
break
c.close()
尽管如此,用户端的代码非常繁重,需要很多步骤。在没有Kafka的直接网络摄像头上,每帧需要150毫秒的时间。但对于Kafka来说,大约需要3秒钟。
有人能帮我把它减少到一个很大的程度,使接近实时吗
暂无答案!
目前还没有任何答案,快来回答吧!