12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- import time
- from socks.server import SocketServer
- from decoder.rtp import RTP
- from decoder.h265 import H265Decoder
- ss = SocketServer("192.168.1.6", 6010)
- @ss.on("connect")
- def conn(client, addr):
- print(f"{addr} is connected")
- ss.emit(client, "camera-id")
- time.sleep(1)
- print("receive 3s image data")
- ss.emit(client, "image")
- time.sleep(60)
- ss.emit(client, "close")
- print("stop image transport")
- @ss.on("pin")
- def pin(client, data):
- print(f"got pin, data={data}")
- ss.emit(client, "pon", data)
- class SpeedTester:
- def __init__(self):
- self.length = 0
- self.last_time = None
- def new(self, length: "int"):
- now = time.time()
- self.length += length
- if self.last_time is None:
- self.last_time = now
- print(f"start time: {self.last_time}")
- return
- print(f"rate: {self.length / (now - self.last_time) / 1024:.3f} KB/s")
- # st = SpeedTester()
- decoder = H265Decoder()
- @ss.on("image")
- def image(client, data):
- control = data[:4]
- size = (control[2] << 8) + control[3]
- if size + 4 <= len(data):
- rtp = RTP(data[4:])
- decoder.parse(rtp)
- ... # TODO: 未完成
- @ss.on("camera-id")
- def info(client, data):
- print(f"got info data: {data}")
- if __name__ == "__main__":
- ss.start()
|