Server.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package udp
  2. import (
  3. "../com"
  4. "fmt"
  5. "log"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. type SClient struct {
  11. Total uint
  12. Positive bool
  13. Addr *net.UDPAddr
  14. packs map[uint]*DataPacket
  15. }
  16. type Server struct {
  17. port int
  18. pullGap time.Duration
  19. pullLen time.Duration
  20. logger *log.Logger
  21. conn *net.UDPConn
  22. clients map[string]*SClient
  23. mutex sync.Mutex
  24. }
  25. func NewServer(port int, gap int, len int, logger *log.Logger) *Server {
  26. ser := Server{
  27. port: port, conn: nil,
  28. pullGap: time.Duration(gap),
  29. pullLen: time.Duration(len),
  30. logger: logger,
  31. clients: map[string]*SClient{},
  32. }
  33. return &ser
  34. }
  35. func (ser *Server) Start() {
  36. defer ser.stop()
  37. addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", ser.port))
  38. if err != nil {
  39. ser.logger.Fatalf("Error resolving address:\n%s", err)
  40. }
  41. conn, err := net.ListenUDP("udp", addr)
  42. if err != nil {
  43. ser.logger.Fatalf("Error creating UDP connection:\n%s", err)
  44. }
  45. ser.conn = conn
  46. ser.logger.Printf("Server running on: udp://%s", addr)
  47. go ser.receiveThread()
  48. }
  49. func (ser *Server) receiveThread() {
  50. // 接收数据循环
  51. buffer := make([]byte, PackSize)
  52. for {
  53. n, addr, err := ser.conn.ReadFromUDP(buffer)
  54. if err != nil {
  55. ser.logger.Printf("Error reading from connection:\n%s", err)
  56. continue
  57. }
  58. packet := UnpackEventPacket(buffer[:n])
  59. go ser.eventHandler(addr, packet)
  60. // 处理接收到的数据并回显给客户端
  61. //fmt.Printf("Received message from %s: %s\n", addr.String(), string(buffer[:n]))
  62. //_, err = conn.WriteToUDP(buffer[:n], addr)
  63. //if err != nil {
  64. // fmt.Println("Error writing to connection:", err)
  65. //}
  66. }
  67. }
  68. func (ser *Server) stop() {
  69. if ser.local == nil {
  70. return
  71. }
  72. for _, client := range ser.clients {
  73. client.Con.Close()
  74. }
  75. }
  76. func (ser *Server) positiveThread() {
  77. for {
  78. time.Sleep(time.Second * ser.pullGap)
  79. for _, cli := range ser.clients {
  80. cli.Con.UdpDownEmit(com.Packet{Event: "start"})
  81. }
  82. time.Sleep(time.Second * ser.pullLen)
  83. for _, cli := range ser.clients {
  84. cli.Con.Emit(com.Packet{Event: "close"})
  85. }
  86. }
  87. }
  88. func (ser *Server) newClient(con *net.UDPConn) {
  89. conn := NewConnect(con, ser.logger)
  90. client := SClient{
  91. Id: string(com.RandBytes(8)),
  92. Total: uint(0), Con: conn, Addr:,
  93. }
  94. ser.mutex.Lock()
  95. ser.clients = append(ser.clients, &client)
  96. ser.mutex.Unlock()
  97. defer ser.removeClient(&client)
  98. packetChan := make(chan com.Packet)
  99. go client.Con.Run(packetChan)
  100. for {
  101. select {
  102. case packet := <-packetChan:
  103. go ser.eventHandler(&client, packet)
  104. }
  105. }
  106. }
  107. func (ser *Server) removeClient(client *SClient) {
  108. ser.mutex.Lock()
  109. for i, c := range ser.clients {
  110. if c == client {
  111. client.Con.Close()
  112. ser.clients = append(ser.clients[:i], ser.clients[i+1:]...)
  113. break
  114. }
  115. }
  116. ser.mutex.Unlock()
  117. }
  118. func (ser *Server) eventHandler(addr *net.UDPAddr, packet *EventPacket) { // data maybe used in future use-case
  119. switch packet.Event {
  120. case "init":
  121. client.Id = string(packet.Data)
  122. ser.logger.Printf("Client connect, id: [%s], addr: (%s)", client.Id, client.Con.con.RemoteAddr())
  123. break
  124. case "pin":
  125. ser.logger.Printf("pin from [%s], seq: %d", client.Id, packet.Data[0])
  126. packet.Event, packet.Data[0] = "pon", packet.Data[0]+1
  127. client.Con.Emit(packet)
  128. break
  129. case "packet":
  130. client.Total++
  131. total, data := com.BytesToUint(packet.Data[:4]), packet.Data[4:]
  132. ser.logger.Printf(
  133. "packet from [%s], lost: %.3f, len=%d, data: \"%s\"",
  134. client.Id, float64(total-client.Total)/float64(total), len(data), data,
  135. )
  136. break
  137. case "error":
  138. ser.logger.Printf("client break, id: [%s]", client.Id)
  139. ser.removeClient(client)
  140. break
  141. default:
  142. ser.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
  143. break
  144. }
  145. }