package lib import ( "fmt" "log" "net" "strings" "sync" "time" ) type Server struct { logger *log.Logger cameraId string alive time.Duration sock net.Conn lock sync.Mutex upload chan bool } func (ser *Server) Init(host string, port int, alive int, logger *log.Logger, id string) { sock, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { ser.logger.Fatal("Fail to connect socket server") } ser.sock, ser.alive = sock, time.Duration(alive) ser.logger, ser.cameraId = logger, id } func (ser *Server) Run(upload chan bool) { ser.upload = upload go ser.keepAliveThread() go ser.transportThread() } func (ser *Server) Emit(event string, data []byte) { if ser.sock != nil { message := []byte(fmt.Sprintf("==EVENT:%s==", event)) message = append(message, data...) message = append(message, []byte("==END==")...) ser.lock.Lock() _, err := ser.sock.Write(message) ser.lock.Unlock() if err != nil { ser.logger.Fatalf("Failed to emit on event: %s.", event) } } } func (ser *Server) Stop() { if ser.sock == nil { return } _ = ser.sock.Close() ser.sock = nil } func (ser *Server) transportThread() { defer ser.Stop() buffer, cur := "", make([]byte, BlockSize) for { n, err := ser.sock.Read(cur) if err != nil { ser.logger.Fatal("server data receive error", err) } buffer += string(cur[:n]) for { if index := strings.Index(buffer, SockEnd); index != -1 { pool := buffer[:index] buffer = buffer[index+SockEndLen:] if index = strings.Index(pool, SockStart); index != -1 { pool = pool[index+SockStartLen:] index = strings.Index(pool, SockDelim) go ser.eventHandler(pool[:index], pool[SockDelimLen:]) } } else { break } } } } func (ser *Server) eventHandler(event string, data string) { // data maybe used in future use-case switch event { case "pon": break case "camera-id": ser.Emit("camera-id", []byte(ser.cameraId)) break case "image": ser.upload <- true break case "close": ser.upload <- false break default: ser.logger.Printf("unrecognized event: %s, with data: %s\n", event, data[:10]) } } func (ser *Server) keepAliveThread() { defer ser.Stop() for { time.Sleep(time.Second * ser.alive) ser.Emit("pin", []byte(fmt.Sprintf("%d", time.Now().Unix()))) } }