server-test.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import time
  2. from socks.server import SocketServer
  3. from decoder.rtp import RTP
  4. from decoder.h265 import H265Decoder
  5. ss = SocketServer("192.168.1.6", 6010)
  6. @ss.on("connect")
  7. def conn(client, addr):
  8. print(f"{addr} is connected")
  9. ss.emit(client, "camera-id")
  10. time.sleep(1)
  11. print("receive 3s image data")
  12. ss.emit(client, "image")
  13. time.sleep(60)
  14. ss.emit(client, "close")
  15. print("stop image transport")
  16. @ss.on("pin")
  17. def pin(client, data):
  18. print(f"got pin, data={data}")
  19. ss.emit(client, "pon", data)
  20. class SpeedTester:
  21. def __init__(self):
  22. self.length = 0
  23. self.last_time = None
  24. def new(self, length: "int"):
  25. now = time.time()
  26. self.length += length
  27. if self.last_time is None:
  28. self.last_time = now
  29. print(f"start time: {self.last_time}")
  30. return
  31. print(f"rate: {self.length / (now - self.last_time) / 1024:.3f} KB/s")
  32. # st = SpeedTester()
  33. decoder = H265Decoder()
  34. @ss.on("image")
  35. def image(client, data):
  36. control = data[:4]
  37. size = (control[2] << 8) + control[3]
  38. if size + 4 <= len(data):
  39. rtp = RTP(data[4:])
  40. decoder.parse(rtp)
  41. ... # TODO: 未完成
  42. @ss.on("camera-id")
  43. def info(client, data):
  44. print(f"got info data: {data}")
  45. if __name__ == "__main__":
  46. ss.start()