package udp import ( "../com" "fmt" "log" "net" "sync" "time" ) type SClient struct { Total uint Positive bool Addr *net.UDPAddr packs map[uint]*DataPacket } type Server struct { port int pullGap time.Duration pullLen time.Duration logger *log.Logger conn *net.UDPConn clients map[string]*SClient mutex sync.Mutex } func NewServer(port int, gap int, len int, logger *log.Logger) *Server { ser := Server{ port: port, conn: nil, pullGap: time.Duration(gap), pullLen: time.Duration(len), logger: logger, clients: map[string]*SClient{}, } return &ser } func (ser *Server) Start() { defer ser.stop() addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", ser.port)) if err != nil { ser.logger.Fatalf("Error resolving address:\n%s", err) } conn, err := net.ListenUDP("udp", addr) if err != nil { ser.logger.Fatalf("Error creating UDP connection:\n%s", err) } ser.conn = conn ser.logger.Printf("Server running on: udp://%s", addr) go ser.receiveThread() } func (ser *Server) receiveThread() { // 接收数据循环 buffer := make([]byte, PackSize) for { n, addr, err := ser.conn.ReadFromUDP(buffer) if err != nil { ser.logger.Printf("Error reading from connection:\n%s", err) continue } packet := UnpackEventPacket(buffer[:n]) go ser.eventHandler(addr, packet) // 处理接收到的数据并回显给客户端 //fmt.Printf("Received message from %s: %s\n", addr.String(), string(buffer[:n])) //_, err = conn.WriteToUDP(buffer[:n], addr) //if err != nil { // fmt.Println("Error writing to connection:", err) //} } } func (ser *Server) stop() { if ser.local == nil { return } for _, client := range ser.clients { client.Con.Close() } } func (ser *Server) positiveThread() { for { time.Sleep(time.Second * ser.pullGap) for _, cli := range ser.clients { cli.Con.UdpDownEmit(com.Packet{Event: "start"}) } time.Sleep(time.Second * ser.pullLen) for _, cli := range ser.clients { cli.Con.Emit(com.Packet{Event: "close"}) } } } func (ser *Server) newClient(con *net.UDPConn) { conn := NewConnect(con, ser.logger) client := SClient{ Id: string(com.RandBytes(8)), Total: uint(0), Con: conn, Addr:, } ser.mutex.Lock() ser.clients = append(ser.clients, &client) ser.mutex.Unlock() defer ser.removeClient(&client) packetChan := make(chan com.Packet) go client.Con.Run(packetChan) for { select { case packet := <-packetChan: go ser.eventHandler(&client, packet) } } } func (ser *Server) removeClient(client *SClient) { ser.mutex.Lock() for i, c := range ser.clients { if c == client { client.Con.Close() ser.clients = append(ser.clients[:i], ser.clients[i+1:]...) break } } ser.mutex.Unlock() } func (ser *Server) eventHandler(addr *net.UDPAddr, packet *EventPacket) { // data maybe used in future use-case switch packet.Event { case "init": client.Id = string(packet.Data) ser.logger.Printf("Client connect, id: [%s], addr: (%s)", client.Id, client.Con.con.RemoteAddr()) break case "pin": ser.logger.Printf("pin from [%s], seq: %d", client.Id, packet.Data[0]) packet.Event, packet.Data[0] = "pon", packet.Data[0]+1 client.Con.Emit(packet) break case "packet": client.Total++ total, data := com.BytesToUint(packet.Data[:4]), packet.Data[4:] ser.logger.Printf( "packet from [%s], lost: %.3f, len=%d, data: \"%s\"", client.Id, float64(total-client.Total)/float64(total), len(data), data, ) break case "error": ser.logger.Printf("client break, id: [%s]", client.Id) ser.removeClient(client) break default: ser.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data) break } }