123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package lib
- import (
- "fmt"
- "log"
- "net"
- "strings"
- "sync"
- "time"
- )
- type Server struct {
- logger *log.Logger
- cameraId string
- alive time.Duration
- sock net.Conn
- lock sync.Mutex
- upload chan bool
- }
- func (ser *Server) Init(host string, port int, alive int, logger *log.Logger, id string) {
- sock, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
- if err != nil {
- ser.logger.Fatal("Fail to connect socket server")
- }
- ser.sock, ser.alive = sock, time.Duration(alive)
- ser.logger, ser.cameraId = logger, id
- }
- func (ser *Server) Run(upload chan bool) {
- ser.upload = upload
- go ser.keepAliveThread()
- go ser.transportThread()
- }
- func (ser *Server) Emit(event string, data []byte) {
- if ser.sock != nil {
- message := []byte(fmt.Sprintf("==EVENT:%s==", event))
- message = append(message, data...)
- message = append(message, []byte("==END==")...)
- ser.lock.Lock()
- _, err := ser.sock.Write(message)
- ser.lock.Unlock()
- if err != nil {
- ser.logger.Fatalf("Failed to emit on event: %s.", event)
- }
- }
- }
- func (ser *Server) Stop() {
- if ser.sock == nil {
- return
- }
- _ = ser.sock.Close()
- ser.sock = nil
- }
- func (ser *Server) transportThread() {
- defer ser.Stop()
- buffer, cur := "", make([]byte, BlockSize)
- for {
- n, err := ser.sock.Read(cur)
- if err != nil {
- ser.logger.Fatal("server data receive error", err)
- }
- buffer += string(cur[:n])
- for {
- if index := strings.Index(buffer, SockEnd); index != -1 {
- pool := buffer[:index]
- buffer = buffer[index+SockEndLen:]
- if index = strings.Index(pool, SockStart); index != -1 {
- pool = pool[index+SockStartLen:]
- index = strings.Index(pool, SockDelim)
- go ser.eventHandler(pool[:index], pool[SockDelimLen:])
- }
- } else {
- break
- }
- }
- }
- }
- func (ser *Server) eventHandler(event string, data string) { // data maybe used in future use-case
- switch event {
- case "pon":
- break
- case "camera-id":
- ser.Emit("camera-id", []byte(ser.cameraId))
- break
- case "image":
- ser.upload <- true
- break
- case "close":
- ser.upload <- false
- break
- default:
- ser.logger.Printf("unrecognized event: %s, with data: %s\n", event, data[:10])
- }
- }
- func (ser *Server) keepAliveThread() {
- defer ser.Stop()
- for {
- time.Sleep(time.Second * ser.alive)
- ser.Emit("pin", []byte(fmt.Sprintf("%d", time.Now().Unix())))
- }
- }
|