123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import cv2
- import numpy as np
- from threading import Thread
- from requests import request
- from time import localtime, strftime, sleep
- class Camera:
- """
- rtsp://admin:hmkj6688@192.168.1.201:554/cam/realmonitor?channel=1&subtype=0
- """
- __RTSP_URL = "rtsp://admin:hmkj6688@192.168.1.254:554/cam/realmonitor?channel=1&subtype=0"
- def __init__(self):
- self.__handler, self.__data = None, None
- self.__reading: "bool" = True
- self.connect(Camera.__RTSP_URL)
- @staticmethod
- def __current_time(fmt: "str" = "%Y-%m-%d_%H-%M-%S") -> "str":
- return strftime(fmt, localtime())
- @staticmethod
- def resize(img: "np.ndarray", ratio: "float" = -1) -> "np.ndarray":
- if ratio == -1:
- return img
- h, w = img.shape[:2]
- tar_shape = int(w / ratio), int(h / ratio)
- return cv2.resize(img, tar_shape, interpolation=cv2.INTER_AREA)
- @staticmethod
- def detr(img: "np.ndarray") -> "np.ndarray":
- _, img = cv2.imencode(".jpg", img) # noqa
- files = {"img": ("from-python.jpg", img.tobytes(), "image/jpg")}
- resp = request("POST", "http://192.168.1.48:5080/", files=files)
- img = cv2.imdecode(np.frombuffer(resp.content, np.uint8), 1)
- return img
- def __read(self) -> "None":
- while True:
- if not self.__reading:
- self.__handler.release()
- self.__handler = None
- break
- ret, self.__data = self.__handler.read()
- if not ret:
- raise RuntimeError("Fail To Read Data.")
- def connect(self, rtsp: "str" = None) -> "None":
- if rtsp is None:
- rtsp = Camera.__RTSP_URL
- self.__handler = cv2.VideoCapture(rtsp)
- if not self.__handler.isOpened():
- raise RuntimeError("Fail To Connect Camera.")
- self.__reading = True
- Thread(target=self.__read).start()
- while self.__data is None:
- sleep(0.5)
- def close(self) -> "None":
- self.__reading = False
- self.__data = None
- def read_current(self) -> "np.ndarray":
- return self.__data
- def save_current_to(self, path: "str" = None, compress: "int" = -1) -> None:
- if path is None:
- path = f"./images/{self.__current_time()}.jpg"
- if compress == -1:
- cv2.imwrite(path, self.__data) # noqa
- else:
- cv2.imwrite(path, self.resize(self.__data, compress))
- def read_stream(self, time_gap: "float" = 1.0, total: "int" = -1):
- if total == -1:
- while True:
- yield self.__data
- if time_gap > 0:
- sleep(time_gap)
- else:
- while total > 0:
- yield self.__data
- total -= 1
- if total > 0 and time_gap > 0:
- sleep(time_gap)
- def main():
- camera = Camera() # 实例化一个固定IP的Camera对象
- radio = 3
- for item in camera.read_stream(0): # 每5秒从图像流中读取一帧图像数据
- item = camera.resize(item, radio) # 长宽压缩2倍
- # cv2.imshow("raw", item) # 显示原图
- d_img = camera.detr(item)
- d_img = camera.resize(d_img, 1 / radio)
- cv2.imshow("detr", d_img) # 显示检测结果
- cv2.waitKey(1)
- camera.close()
- if __name__ == "__main__":
- main()
|