server.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package lib
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. type Server struct {
  11. logger *log.Logger
  12. cameraId string
  13. alive time.Duration
  14. sock net.Conn
  15. lock sync.Mutex
  16. upload chan bool
  17. }
  18. func (ser *Server) Init(host string, port int, alive int, logger *log.Logger, id string) {
  19. sock, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
  20. if err != nil {
  21. ser.logger.Fatal("Fail to connect socket server")
  22. }
  23. ser.sock, ser.alive = sock, time.Duration(alive)
  24. ser.logger, ser.cameraId = logger, id
  25. }
  26. func (ser *Server) Run(upload chan bool) {
  27. ser.upload = upload
  28. go ser.keepAliveThread()
  29. go ser.transportThread()
  30. }
  31. func (ser *Server) Emit(event string, data []byte) {
  32. if ser.sock != nil {
  33. message := []byte(fmt.Sprintf("==EVENT:%s==", event))
  34. message = append(message, data...)
  35. message = append(message, []byte("==END==")...)
  36. ser.lock.Lock()
  37. _, err := ser.sock.Write(message)
  38. ser.lock.Unlock()
  39. if err != nil {
  40. ser.logger.Fatalf("Failed to emit on event: %s.", event)
  41. }
  42. }
  43. }
  44. func (ser *Server) Stop() {
  45. if ser.sock == nil {
  46. return
  47. }
  48. _ = ser.sock.Close()
  49. ser.sock = nil
  50. }
  51. func (ser *Server) transportThread() {
  52. defer ser.Stop()
  53. buffer, cur := "", make([]byte, BlockSize)
  54. for {
  55. n, err := ser.sock.Read(cur)
  56. if err != nil {
  57. ser.logger.Fatal("server data receive error", err)
  58. }
  59. buffer += string(cur[:n])
  60. for {
  61. if index := strings.Index(buffer, SockEnd); index != -1 {
  62. pool := buffer[:index]
  63. buffer = buffer[index+SockEndLen:]
  64. if index = strings.Index(pool, SockStart); index != -1 {
  65. pool = pool[index+SockStartLen:]
  66. index = strings.Index(pool, SockDelim)
  67. go ser.eventHandler(pool[:index], pool[SockDelimLen:])
  68. }
  69. } else {
  70. break
  71. }
  72. }
  73. }
  74. }
  75. func (ser *Server) eventHandler(event string, data string) { // data maybe used in future use-case
  76. switch event {
  77. case "pon":
  78. break
  79. case "camera-id":
  80. ser.Emit("camera-id", []byte(ser.cameraId))
  81. break
  82. case "image":
  83. ser.upload <- true
  84. break
  85. case "close":
  86. ser.upload <- false
  87. break
  88. default:
  89. ser.logger.Printf("unrecognized event: %s, with data: %s\n", event, data[:10])
  90. }
  91. }
  92. func (ser *Server) keepAliveThread() {
  93. defer ser.Stop()
  94. for {
  95. time.Sleep(time.Second * ser.alive)
  96. ser.Emit("pin", []byte(fmt.Sprintf("%d", time.Now().Unix())))
  97. }
  98. }