import cv2 import numpy as np from .lib import DETR_HOST from threading import Thread from requests import request from time import sleep, strftime, localtime __all__ = ["Rtsp"] class Rtsp: def __init__(self): self.__handler, self.__data = None, None self.__reading: "bool" = False @staticmethod def __current_time(fmt: "str" = "%Y-%m-%d_%H-%M-%S") -> "str": return strftime(fmt, localtime()) @staticmethod def resize(img: "np.ndarray", ratio: "int" = -1) -> "np.ndarray": if ratio == -1: return img h, w = img.shape[:2] tar_shape = int(w / ratio), int(h / ratio) return cv2.resize(img, tar_shape, interpolation=cv2.INTER_AREA) @staticmethod def detr(img: "np.ndarray") -> "np.ndarray": _, img = cv2.imencode(".jpg", img) # noqa files = {"img": ("from-python.jpg", img.tobytes(), "image/jpg")} resp = request("POST", DETR_HOST, files=files) img = cv2.imdecode(np.frombuffer(resp.content, np.uint8), 1) return img def __write(self): width = int(self.__handler.get(3)) height = int(self.__handler.get(4)) fps = int(self.__handler.get(5)) print(width, height, fps) fourcc = cv2.VideoWriter_fourcc(*'XVID') writer = cv2.VideoWriter("./live.mp4", fourcc, fps, (width, height)) ret, frame = self.__handler.read() while ret: writer.write(frame) ret, frame = self.__handler.read() def __read(self) -> "None": while True: if not self.__reading: self.__handler.release() self.__handler = None break ret, self.__data = self.__handler.read() if not ret: raise RuntimeError("Fail To Read Data.") def connect(self, rtsp: "str") -> "None": self.__handler = cv2.VideoCapture(rtsp) if not self.__handler.isOpened(): raise RuntimeError("Fail To Connect Camera.") self.__reading = True Thread(target=self.__read, name="RTSP reader").start() # Thread(target=self.__write, name="RTSP writer").start() while self.__data is None: sleep(0.5) def close(self) -> "None": self.__reading = False self.__data = None def read_current(self) -> "np.ndarray": return self.__data def save_current_to(self, path: "str", compress: "int" = -1) -> None: if compress == -1: cv2.imwrite(path, self.__data) else: cv2.imwrite(path, self.resize(self.__data, compress)) def read_stream(self, time_gap: "float" = 1.0, total: "int" = -1): if total == -1: while True: yield self.__data if time_gap > 0: sleep(time_gap) else: while total > 0: yield self.__data total -= 1 if total > 0 and time_gap > 0: sleep(time_gap)