12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import cv2
- import numpy as np
- from .lib import DETR_HOST
- from threading import Thread
- from requests import request
- from time import sleep, strftime, localtime
- __all__ = ["Rtsp"]
- class Rtsp:
- def __init__(self):
- self.__handler, self.__data = None, None
- self.__reading: "bool" = False
- @staticmethod
- def __current_time(fmt: "str" = "%Y-%m-%d_%H-%M-%S") -> "str":
- return strftime(fmt, localtime())
- @staticmethod
- def resize(img: "np.ndarray", ratio: "int" = -1) -> "np.ndarray":
- if ratio == -1:
- return img
- h, w = img.shape[:2]
- tar_shape = int(w / ratio), int(h / ratio)
- return cv2.resize(img, tar_shape, interpolation=cv2.INTER_AREA)
- @staticmethod
- def detr(img: "np.ndarray") -> "np.ndarray":
- _, img = cv2.imencode(".jpg", img) # noqa
- files = {"img": ("from-python.jpg", img.tobytes(), "image/jpg")}
- resp = request("POST", DETR_HOST, files=files)
- img = cv2.imdecode(np.frombuffer(resp.content, np.uint8), 1)
- return img
- def __write(self):
- width = int(self.__handler.get(3))
- height = int(self.__handler.get(4))
- fps = int(self.__handler.get(5))
- print(width, height, fps)
- fourcc = cv2.VideoWriter_fourcc(*'XVID')
- writer = cv2.VideoWriter("./live.mp4", fourcc, fps, (width, height))
- ret, frame = self.__handler.read()
- while ret:
- writer.write(frame)
- ret, frame = self.__handler.read()
- def __read(self) -> "None":
- while True:
- if not self.__reading:
- self.__handler.release()
- self.__handler = None
- break
- ret, self.__data = self.__handler.read()
- if not ret:
- raise RuntimeError("Fail To Read Data.")
- def connect(self, rtsp: "str") -> "None":
- self.__handler = cv2.VideoCapture(rtsp)
- if not self.__handler.isOpened():
- raise RuntimeError("Fail To Connect Camera.")
- self.__reading = True
- Thread(target=self.__read, name="RTSP reader").start()
- # Thread(target=self.__write, name="RTSP writer").start()
- while self.__data is None:
- sleep(0.5)
- def close(self) -> "None":
- self.__reading = False
- self.__data = None
- def read_current(self) -> "np.ndarray":
- return self.__data
- def save_current_to(self, path: "str", compress: "int" = -1) -> None:
- if compress == -1:
- cv2.imwrite(path, self.__data)
- else:
- cv2.imwrite(path, self.resize(self.__data, compress))
- def read_stream(self, time_gap: "float" = 1.0, total: "int" = -1):
- if total == -1:
- while True:
- yield self.__data
- if time_gap > 0:
- sleep(time_gap)
- else:
- while total > 0:
- yield self.__data
- total -= 1
- if total > 0 and time_gap > 0:
- sleep(time_gap)
|