python—如何完全控制与flask应用程序并行运行的进程(启动/终止)?

jfgube3f  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(608)

这是我的应用程序体系结构:

在我的代码里有一个 pedestrian.py 该文件使用while循环从rtsp链接读取帧,并在执行行人检测过程(此链接中提供)后,将帧缓存在redis中。
(请注意,在循环中,每次将输出帧替换为来自循环的上一个输出时。这意味着redis在任何时刻都只存在一个帧。)
然后在flask应用程序中,我从redis读取已处理的帧并将其发送给客户机。
这是我的行人检测代码:

  1. from redis import Redis
  2. from concurrent.futures import ThreadPoolExecutor
  3. import cv2
  4. import torch
  5. from os import environ
  6. r = Redis('111.222.333.444')
  7. class RealTimeTracking(object):
  8. """
  9. This class is built to get frame from rtsp link and continuously
  10. save each frame in the output directory. then we use flask to give it
  11. as service to client.
  12. Args:
  13. args: parse_args inputs
  14. cfg: deepsort dict and yolo-model cfg from server_cfg file
  15. """
  16. def __init__(self, cfg, args):
  17. # Create a VideoCapture object
  18. self.cfg = cfg
  19. self.args = args
  20. use_cuda = self.args.use_cuda and torch.cuda.is_available()
  21. if not use_cuda:
  22. raise UserWarning("Running in cpu mode!")
  23. self.detector = build_detector(cfg, use_cuda=use_cuda)
  24. self.deepsort = build_tracker(cfg, use_cuda=use_cuda)
  25. self.class_names = self.detector.class_names
  26. self.vdo = cv2.VideoCapture(self.args.input)
  27. self.status, self.frame = None, None
  28. self.total_frames = int(cv2.VideoCapture.get(self.vdo, cv2.CAP_PROP_FRAME_COUNT))
  29. self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
  30. self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))
  31. self.output_frame = None
  32. self.thread = ThreadPoolExecutor(max_workers=1)
  33. self.thread.submit(self.update)
  34. print('streaming started ...')
  35. def update(self):
  36. while True:
  37. if self.vdo.isOpened():
  38. (self.status, self.frame) = self.vdo.read()
  39. def run(self):
  40. while True:
  41. try:
  42. if self.status:
  43. frame = self.frame.copy()
  44. # frame = cv2.resize(frame, (640, 480))
  45. self.detection(frame=frame)
  46. frame_to_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
  47. r.set('frame', frame_to_bytes)
  48. except AttributeError:
  49. pass
  50. def detection(self, frame):
  51. im = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  52. # do detection
  53. bbox_xywh, cls_conf, cls_ids = self.detector(im)
  54. if bbox_xywh is not None:
  55. # select person class
  56. mask = cls_ids == 0
  57. bbox_xywh = bbox_xywh[mask]
  58. bbox_xywh[:, 3:] *= 1.2 # bbox dilation just in case bbox too small
  59. cls_conf = cls_conf[mask]
  60. # do tracking
  61. outputs = self.deepsort.update(bbox_xywh, cls_conf, im)
  62. # draw boxes for visualization
  63. if len(outputs) > 0:
  64. self.draw_boxes(img=frame, output=outputs)
  65. @staticmethod
  66. def draw_boxes(img, output, offset=(0, 0)):
  67. for i, box in enumerate(output):
  68. x1, y1, x2, y2, identity = [int(ii) for ii in box]
  69. x1 += offset[0]
  70. x2 += offset[0]
  71. y1 += offset[1]
  72. y2 += offset[1]
  73. # box text and bar
  74. color = compute_color_for_labels(identity)
  75. label = '{}{:d}'.format("", identity)
  76. t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
  77. cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
  78. cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
  79. cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
  80. return img
  81. if __name__ == "__main__":
  82. args = parse_args() # argument: --rtsp_link = 'rtsp://me@111.222.333.444/Channels/105'
  83. cfg = get_config()
  84. cfg.merge_from_dict(model)
  85. cfg.merge_from_dict(deep_sort_dict)
  86. vdo_trk = RealTimeTracking(cfg, args)
  87. vdo_trk.run()

这是flask服务器的代码 app.py :

  1. from dotenv import load_dotenv
  2. from time import sleep
  3. from os import getenv
  4. from os.path import join
  5. import subprocess
  6. from flask import Response, Flask
  7. from config.config import DevelopmentConfig
  8. from redis import Redis
  9. r = Redis('111.222.333.444')
  10. app = Flask(__name__)
  11. def gen():
  12. while True:
  13. frame = r.get('frame')
  14. if frame is not None:
  15. yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
  16. @app.route('/')
  17. def video_feed():
  18. """Video streaming route. Put this in the src attribute of an img tag."""
  19. return Response(gen(),
  20. mimetype='multipart/x-mixed-replace; boundary=frame')
  21. if __name__ == '__main__':
  22. load_dotenv()
  23. app.config.from_object(DevelopmentConfig)
  24. cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/105']
  25. p = subprocess.Popen(cmd)
  26. sleep(6)
  27. app.run()

这段代码在我的系统中运行得很好。
如您所见,在运行flask服务器之前,我使用cmd命令在rtsp链接上运行行人检测。
但是我真正需要的是能够在不同的摄像机之间切换。我的意思是,当flask服务器运行时,我希望能够终止 pedestrian.py 随时处理请求并重新启动 pedestrian.py 用新的 --rtsp_link 参数(切换到另一个摄像机)。
像这样:

  1. @app.route('/cam1'):
  2. def cam1():
  3. stop('pedestrian.py')
  4. cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/101']
  5. p = subprocess.Popen(cmd)
  6. @app.route('/cam2'):
  7. def cam2():
  8. stop('pedestrian.py')
  9. cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/110']
  10. p = subprocess.Popen(cmd)

我的知识可能不够好。我可能需要使用post方法和身份验证。
你能告诉我如何在这段代码中实现这样的东西吗?

3yhwsihp

3yhwsihp1#

我找到了一种自动启动/停止行人检测的方法。更多详情请参见我的回购协议:
从os.path import join从os import getenv,environ从dotenv import load\u dotenv import argparse从threading import thread

  1. from redis import Redis
  2. from flask import Response, Flask, jsonify, request, abort
  3. from rtsp_threaded_tracker import RealTimeTracking
  4. from server_cfg import model, deep_sort_dict
  5. from config.config import DevelopmentConfig
  6. from utils.parser import get_config
  7. redis_cache = Redis('127.0.0.1')
  8. app = Flask(__name__)
  9. environ['in_progress'] = 'off'
  10. def parse_args():
  11. """
  12. Parses the arguments
  13. Returns:
  14. argparse Namespace
  15. """
  16. assert 'project_root' in environ.keys()
  17. project_root = getenv('project_root')
  18. parser = argparse.ArgumentParser()
  19. parser.add_argument("--input",
  20. type=str,
  21. default=getenv('camera_stream'))
  22. parser.add_argument("--model",
  23. type=str,
  24. default=join(project_root,
  25. getenv('model_type')))
  26. parser.add_argument("--cpu",
  27. dest="use_cuda",
  28. action="store_false", default=True)
  29. args = parser.parse_args()
  30. return args
  31. def gen():
  32. """
  33. Returns: video frames from redis cache
  34. """
  35. while True:
  36. frame = redis_cache.get('frame')
  37. if frame is not None:
  38. yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
  39. def pedestrian_tracking(cfg, args):
  40. """
  41. starts the pedestrian detection on rtsp link
  42. Args:
  43. cfg:
  44. args:
  45. Returns:
  46. """
  47. tracker = RealTimeTracking(cfg, args)
  48. tracker.run()
  49. def trigger_process(cfg, args):
  50. """
  51. triggers pedestrian_tracking process on rtsp link using a thread
  52. Args:
  53. cfg:
  54. args:
  55. Returns:
  56. """
  57. try:
  58. t = Thread(target=pedestrian_tracking, args=(cfg, args))
  59. t.start()
  60. return jsonify({"message": "Pedestrian detection started successfully"})
  61. except Exception:
  62. return jsonify({'message': "Unexpected exception occured in process"})
  63. @app.errorhandler(400)
  64. def bad_argument(error):
  65. return jsonify({'message': error.description['message']})
  66. # Routes
  67. @app.route('/stream', methods=['GET'])
  68. def stream():
  69. """
  70. Provides video frames on http link
  71. Returns:
  72. """
  73. return Response(gen(),
  74. mimetype='multipart/x-mixed-replace; boundary=frame')
  75. @app.route("/run", methods=['GET'])
  76. def process_manager():
  77. """
  78. request parameters:
  79. run (bool): 1 -> start the pedestrian tracking
  80. 0 -> stop it
  81. camera_stream: str -> rtsp link to security camera
  82. :return:
  83. """
  84. # data = request.args
  85. data = request.args
  86. status = data['run']
  87. status = int(status) if status.isnumeric() else abort(400, {'message': f"bad argument for run {data['run']}"})
  88. if status == 1:
  89. # if pedestrian tracking is not running, start it off!
  90. try:
  91. if environ.get('in_progress', 'off') == 'off':
  92. global cfg, args
  93. vdo = data.get('camera_stream')
  94. if vdo is not None:
  95. args.input = int(vdo)
  96. environ['in_progress'] = 'on'
  97. return trigger_process(cfg, args)
  98. elif environ.get('in_progress') == 'on':
  99. # if pedestrian tracking is running, don't start another one (we are short of gpu resources)
  100. return jsonify({"message": " Pedestrian detection is already in progress."})
  101. except Exception:
  102. environ['in_progress'] = 'off'
  103. return abort(503)
  104. elif status == 0:
  105. if environ.get('in_progress', 'off') == 'off':
  106. return jsonify({"message": "pedestrian detection is already terminated!"})
  107. else:
  108. environ['in_progress'] = 'off'
  109. return jsonify({"message": "Pedestrian detection terminated!"})
  110. if __name__ == '__main__':
  111. load_dotenv()
  112. app.config.from_object(DevelopmentConfig)
  113. # BackProcess Initialization
  114. args = parse_args()
  115. cfg = get_config()
  116. cfg.merge_from_dict(model)
  117. cfg.merge_from_dict(deep_sort_dict)
  118. # Start the flask app
  119. app.run()
展开查看全部

相关问题