can-work.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import cv2
  2. import numpy as np
  3. from threading import Thread
  4. from requests import request
  5. from time import localtime, strftime, sleep
  6. class Camera:
  7. """
  8. rtsp://admin:hmkj6688@192.168.1.201:554/cam/realmonitor?channel=1&subtype=0
  9. """
  10. __RTSP_URL = "rtsp://admin:hmkj6688@192.168.1.254:554/cam/realmonitor?channel=1&subtype=0"
  11. def __init__(self):
  12. self.__handler, self.__data = None, None
  13. self.__reading: "bool" = True
  14. self.connect(Camera.__RTSP_URL)
  15. @staticmethod
  16. def __current_time(fmt: "str" = "%Y-%m-%d_%H-%M-%S") -> "str":
  17. return strftime(fmt, localtime())
  18. @staticmethod
  19. def resize(img: "np.ndarray", ratio: "float" = -1) -> "np.ndarray":
  20. if ratio == -1:
  21. return img
  22. h, w = img.shape[:2]
  23. tar_shape = int(w / ratio), int(h / ratio)
  24. return cv2.resize(img, tar_shape, interpolation=cv2.INTER_AREA)
  25. @staticmethod
  26. def detr(img: "np.ndarray") -> "np.ndarray":
  27. _, img = cv2.imencode(".jpg", img) # noqa
  28. files = {"img": ("from-python.jpg", img.tobytes(), "image/jpg")}
  29. resp = request("POST", "http://192.168.1.48:5080/", files=files)
  30. img = cv2.imdecode(np.frombuffer(resp.content, np.uint8), 1)
  31. return img
  32. def __read(self) -> "None":
  33. while True:
  34. if not self.__reading:
  35. self.__handler.release()
  36. self.__handler = None
  37. break
  38. ret, self.__data = self.__handler.read()
  39. if not ret:
  40. raise RuntimeError("Fail To Read Data.")
  41. def connect(self, rtsp: "str" = None) -> "None":
  42. if rtsp is None:
  43. rtsp = Camera.__RTSP_URL
  44. self.__handler = cv2.VideoCapture(rtsp)
  45. if not self.__handler.isOpened():
  46. raise RuntimeError("Fail To Connect Camera.")
  47. self.__reading = True
  48. Thread(target=self.__read).start()
  49. while self.__data is None:
  50. sleep(0.5)
  51. def close(self) -> "None":
  52. self.__reading = False
  53. self.__data = None
  54. def read_current(self) -> "np.ndarray":
  55. return self.__data
  56. def save_current_to(self, path: "str" = None, compress: "int" = -1) -> None:
  57. if path is None:
  58. path = f"./images/{self.__current_time()}.jpg"
  59. if compress == -1:
  60. cv2.imwrite(path, self.__data) # noqa
  61. else:
  62. cv2.imwrite(path, self.resize(self.__data, compress))
  63. def read_stream(self, time_gap: "float" = 1.0, total: "int" = -1):
  64. if total == -1:
  65. while True:
  66. yield self.__data
  67. if time_gap > 0:
  68. sleep(time_gap)
  69. else:
  70. while total > 0:
  71. yield self.__data
  72. total -= 1
  73. if total > 0 and time_gap > 0:
  74. sleep(time_gap)
  75. def main():
  76. camera = Camera() # 实例化一个固定IP的Camera对象
  77. radio = 3
  78. for item in camera.read_stream(0): # 每5秒从图像流中读取一帧图像数据
  79. item = camera.resize(item, radio) # 长宽压缩2倍
  80. # cv2.imshow("raw", item) # 显示原图
  81. d_img = camera.detr(item)
  82. d_img = camera.resize(d_img, 1 / radio)
  83. cv2.imshow("detr", d_img) # 显示检测结果
  84. cv2.waitKey(1)
  85. camera.close()
  86. if __name__ == "__main__":
  87. main()