123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package tcp
- import (
- "../com"
- "fmt"
- "log"
- "net"
- "sync"
- "time"
- )
- type SClient struct {
- Address string
- Total uint
- Positive bool
- Con *Connect
- }
- type Server struct {
- port int
- listener net.Listener
- logger *log.Logger
- pullGap time.Duration
- pullLen time.Duration
- clients map[string]*SClient
- mutex sync.Mutex
- }
- func NewServer(port int, gap int, len int, logger *log.Logger) *Server {
- ser := Server{
- port: port, listener: nil,
- pullGap: time.Duration(gap),
- pullLen: time.Duration(len),
- logger: logger, clients: map[string]*SClient{},
- }
- return &ser
- }
- func (ser *Server) Start() {
- address := fmt.Sprintf("0.0.0.0:%d", ser.port)
- listen, err := net.Listen("tcp", address)
- if err != nil {
- ser.logger.Fatal("Server establish failed: ", err)
- }
- defer ser.stop()
- ser.listener = listen
- ser.logger.Printf("Server running on: tcp://%s", address)
- go ser.positiveThread()
- for {
- conn, err := listen.Accept()
- if err != nil {
- ser.logger.Printf("Failed to accept a client:\n%s", err)
- }
- go ser.newClient(conn)
- }
- }
- func (ser *Server) positiveThread() {
- for {
- time.Sleep(time.Second * ser.pullGap)
- for _, cli := range ser.clients {
- if !cli.Positive {
- cli.Con.Emit(com.Packet{Event: "start"})
- }
- }
- time.Sleep(time.Second * ser.pullLen)
- for _, cli := range ser.clients {
- if !cli.Positive {
- cli.Con.Emit(com.Packet{Event: "close"})
- }
- }
- }
- }
- func (ser *Server) stop() {
- if ser.listener == nil {
- return
- }
- for key, cli := range ser.clients {
- cli.Con.Close()
- delete(ser.clients, key)
- }
- err := ser.listener.Close()
- if err != nil {
- ser.logger.Fatal("Server close failed")
- }
- }
- func (ser *Server) newClient(con net.Conn) {
- conn := NewConnect(con, ser.logger)
- addr := con.RemoteAddr().String()
- client := SClient{
- Address: addr,
- Total: uint(0), Positive: false, Con: conn,
- }
- ser.mutex.Lock()
- ser.clients[addr] = &client
- ser.mutex.Unlock()
- defer ser.removeClient(addr)
- packetChan := make(chan com.Packet)
- go client.Con.Run(packetChan)
- for {
- select {
- case packet := <-packetChan:
- go ser.eventHandler(&client, packet)
- }
- }
- }
- func (ser *Server) removeClient(addr string) {
- ser.clients[addr].Con.Close()
- ser.mutex.Lock()
- delete(ser.clients, addr)
- ser.mutex.Unlock()
- }
- func (ser *Server) eventHandler(client *SClient, packet com.Packet) { // data maybe used in future use-case
- switch packet.Event {
- case "init":
- client.Positive = packet.Data[0] == 1
- ser.logger.Printf("Client connect, address: %s, positive: %t", client.Address, client.Positive)
- break
- case "pin":
- ser.logger.Printf("pin from [%s], seq: %d", client.Address, packet.Data[0])
- packet.Event, packet.Data[0] = "pon", packet.Data[0]+1
- client.Con.Emit(packet)
- break
- case "packet":
- client.Total++
- total, data := com.BytesToUint(packet.Data[:4]), packet.Data[4:]
- ser.logger.Printf(
- "packet from [%s], lost=%.3f, len=%d, data: %s",
- client.Address, float64(total-client.Total)/float64(total), len(data), data,
- )
- break
- case "error":
- ser.logger.Printf("client break, id: [%s]", client.Address)
- ser.removeClient(client.Address)
- break
- default:
- ser.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
- break
- }
- }
|