import cv2 import numpy as np from threading import Thread from requests import request from time import localtime, strftime, sleep class Camera: """ rtsp://admin:hmkj6688@192.168.1.201:554/cam/realmonitor?channel=1&subtype=0 """ __RTSP_URL = "rtsp://admin:hmkj6688@192.168.1.254:554/cam/realmonitor?channel=1&subtype=0" def __init__(self): self.__handler, self.__data = None, None self.__reading: "bool" = True self.connect(Camera.__RTSP_URL) @staticmethod def __current_time(fmt: "str" = "%Y-%m-%d_%H-%M-%S") -> "str": return strftime(fmt, localtime()) @staticmethod def resize(img: "np.ndarray", ratio: "float" = -1) -> "np.ndarray": if ratio == -1: return img h, w = img.shape[:2] tar_shape = int(w / ratio), int(h / ratio) return cv2.resize(img, tar_shape, interpolation=cv2.INTER_AREA) @staticmethod def detr(img: "np.ndarray") -> "np.ndarray": _, img = cv2.imencode(".jpg", img) # noqa files = {"img": ("from-python.jpg", img.tobytes(), "image/jpg")} resp = request("POST", "http://192.168.1.48:5080/", files=files) img = cv2.imdecode(np.frombuffer(resp.content, np.uint8), 1) return img def __read(self) -> "None": while True: if not self.__reading: self.__handler.release() self.__handler = None break ret, self.__data = self.__handler.read() if not ret: raise RuntimeError("Fail To Read Data.") def connect(self, rtsp: "str" = None) -> "None": if rtsp is None: rtsp = Camera.__RTSP_URL self.__handler = cv2.VideoCapture(rtsp) if not self.__handler.isOpened(): raise RuntimeError("Fail To Connect Camera.") self.__reading = True Thread(target=self.__read).start() while self.__data is None: sleep(0.5) def close(self) -> "None": self.__reading = False self.__data = None def read_current(self) -> "np.ndarray": return self.__data def save_current_to(self, path: "str" = None, compress: "int" = -1) -> None: if path is None: path = f"./images/{self.__current_time()}.jpg" if compress == -1: cv2.imwrite(path, self.__data) # noqa else: cv2.imwrite(path, self.resize(self.__data, compress)) def read_stream(self, time_gap: "float" = 1.0, total: "int" = -1): if total == -1: while True: yield self.__data if time_gap > 0: sleep(time_gap) else: while total > 0: yield self.__data total -= 1 if total > 0 and time_gap > 0: sleep(time_gap) def main(): camera = Camera() # 实例化一个固定IP的Camera对象 radio = 3 for item in camera.read_stream(0): # 每5秒从图像流中读取一帧图像数据 item = camera.resize(item, radio) # 长宽压缩2倍 # cv2.imshow("raw", item) # 显示原图 d_img = camera.detr(item) d_img = camera.resize(d_img, 1 / radio) cv2.imshow("detr", d_img) # 显示检测结果 cv2.waitKey(1) camera.close() if __name__ == "__main__": main()