Server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package tcp
  2. import (
  3. "../com"
  4. "fmt"
  5. "log"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. type SClient struct {
  11. Address string
  12. Total uint
  13. Positive bool
  14. Con *Connect
  15. }
  16. type Server struct {
  17. port int
  18. listener net.Listener
  19. logger *log.Logger
  20. pullGap time.Duration
  21. pullLen time.Duration
  22. clients map[string]*SClient
  23. mutex sync.Mutex
  24. }
  25. func NewServer(port int, gap int, len int, logger *log.Logger) *Server {
  26. ser := Server{
  27. port: port, listener: nil,
  28. pullGap: time.Duration(gap),
  29. pullLen: time.Duration(len),
  30. logger: logger, clients: map[string]*SClient{},
  31. }
  32. return &ser
  33. }
  34. func (ser *Server) Start() {
  35. address := fmt.Sprintf("0.0.0.0:%d", ser.port)
  36. listen, err := net.Listen("tcp", address)
  37. if err != nil {
  38. ser.logger.Fatal("Server establish failed: ", err)
  39. }
  40. defer ser.stop()
  41. ser.listener = listen
  42. ser.logger.Printf("Server running on: tcp://%s", address)
  43. go ser.positiveThread()
  44. for {
  45. conn, err := listen.Accept()
  46. if err != nil {
  47. ser.logger.Printf("Failed to accept a client:\n%s", err)
  48. }
  49. go ser.newClient(conn)
  50. }
  51. }
  52. func (ser *Server) positiveThread() {
  53. for {
  54. time.Sleep(time.Second * ser.pullGap)
  55. for _, cli := range ser.clients {
  56. if !cli.Positive {
  57. cli.Con.Emit(com.Packet{Event: "start"})
  58. }
  59. }
  60. time.Sleep(time.Second * ser.pullLen)
  61. for _, cli := range ser.clients {
  62. if !cli.Positive {
  63. cli.Con.Emit(com.Packet{Event: "close"})
  64. }
  65. }
  66. }
  67. }
  68. func (ser *Server) stop() {
  69. if ser.listener == nil {
  70. return
  71. }
  72. for key, cli := range ser.clients {
  73. cli.Con.Close()
  74. delete(ser.clients, key)
  75. }
  76. err := ser.listener.Close()
  77. if err != nil {
  78. ser.logger.Fatal("Server close failed")
  79. }
  80. }
  81. func (ser *Server) newClient(con net.Conn) {
  82. conn := NewConnect(con, ser.logger)
  83. addr := con.RemoteAddr().String()
  84. client := SClient{
  85. Address: addr,
  86. Total: uint(0), Positive: false, Con: conn,
  87. }
  88. ser.mutex.Lock()
  89. ser.clients[addr] = &client
  90. ser.mutex.Unlock()
  91. defer ser.removeClient(addr)
  92. packetChan := make(chan com.Packet)
  93. go client.Con.Run(packetChan)
  94. for {
  95. select {
  96. case packet := <-packetChan:
  97. go ser.eventHandler(&client, packet)
  98. }
  99. }
  100. }
  101. func (ser *Server) removeClient(addr string) {
  102. ser.clients[addr].Con.Close()
  103. ser.mutex.Lock()
  104. delete(ser.clients, addr)
  105. ser.mutex.Unlock()
  106. }
  107. func (ser *Server) eventHandler(client *SClient, packet com.Packet) { // data maybe used in future use-case
  108. switch packet.Event {
  109. case "init":
  110. client.Positive = packet.Data[0] == 1
  111. ser.logger.Printf("Client connect, address: %s, positive: %t", client.Address, client.Positive)
  112. break
  113. case "pin":
  114. ser.logger.Printf("pin from [%s], seq: %d", client.Address, packet.Data[0])
  115. packet.Event, packet.Data[0] = "pon", packet.Data[0]+1
  116. client.Con.Emit(packet)
  117. break
  118. case "packet":
  119. client.Total++
  120. total, data := com.BytesToUint(packet.Data[:4]), packet.Data[4:]
  121. ser.logger.Printf(
  122. "packet from [%s], lost=%.3f, len=%d, data: %s",
  123. client.Address, float64(total-client.Total)/float64(total), len(data), data,
  124. )
  125. break
  126. case "error":
  127. ser.logger.Printf("client break, id: [%s]", client.Address)
  128. ser.removeClient(client.Address)
  129. break
  130. default:
  131. ser.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
  132. break
  133. }
  134. }