package tcp import ( "../com" "fmt" "log" "net" "sync" "time" ) type SClient struct { Address string Total uint Positive bool Con *Connect } type Server struct { port int listener net.Listener logger *log.Logger pullGap time.Duration pullLen time.Duration clients map[string]*SClient mutex sync.Mutex } func NewServer(port int, gap int, len int, logger *log.Logger) *Server { ser := Server{ port: port, listener: nil, pullGap: time.Duration(gap), pullLen: time.Duration(len), logger: logger, clients: map[string]*SClient{}, } return &ser } func (ser *Server) Start() { address := fmt.Sprintf("0.0.0.0:%d", ser.port) listen, err := net.Listen("tcp", address) if err != nil { ser.logger.Fatal("Server establish failed: ", err) } defer ser.stop() ser.listener = listen ser.logger.Printf("Server running on: tcp://%s", address) go ser.positiveThread() for { conn, err := listen.Accept() if err != nil { ser.logger.Printf("Failed to accept a client:\n%s", err) } go ser.newClient(conn) } } func (ser *Server) positiveThread() { for { time.Sleep(time.Second * ser.pullGap) for _, cli := range ser.clients { if !cli.Positive { cli.Con.Emit(com.Packet{Event: "start"}) } } time.Sleep(time.Second * ser.pullLen) for _, cli := range ser.clients { if !cli.Positive { cli.Con.Emit(com.Packet{Event: "close"}) } } } } func (ser *Server) stop() { if ser.listener == nil { return } for key, cli := range ser.clients { cli.Con.Close() delete(ser.clients, key) } err := ser.listener.Close() if err != nil { ser.logger.Fatal("Server close failed") } } func (ser *Server) newClient(con net.Conn) { conn := NewConnect(con, ser.logger) addr := con.RemoteAddr().String() client := SClient{ Address: addr, Total: uint(0), Positive: false, Con: conn, } ser.mutex.Lock() ser.clients[addr] = &client ser.mutex.Unlock() defer ser.removeClient(addr) packetChan := make(chan com.Packet) go client.Con.Run(packetChan) for { select { case packet := <-packetChan: go ser.eventHandler(&client, packet) } } } func (ser *Server) removeClient(addr string) { ser.clients[addr].Con.Close() ser.mutex.Lock() delete(ser.clients, addr) ser.mutex.Unlock() } func (ser *Server) eventHandler(client *SClient, packet com.Packet) { // data maybe used in future use-case switch packet.Event { case "init": client.Positive = packet.Data[0] == 1 ser.logger.Printf("Client connect, address: %s, positive: %t", client.Address, client.Positive) break case "pin": ser.logger.Printf("pin from [%s], seq: %d", client.Address, packet.Data[0]) packet.Event, packet.Data[0] = "pon", packet.Data[0]+1 client.Con.Emit(packet) break case "packet": client.Total++ total, data := com.BytesToUint(packet.Data[:4]), packet.Data[4:] ser.logger.Printf( "packet from [%s], lost=%.3f, len=%d, data: %s", client.Address, float64(total-client.Total)/float64(total), len(data), data, ) break case "error": ser.logger.Printf("client break, id: [%s]", client.Address) ser.removeClient(client.Address) break default: ser.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data) break } }