123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package udp
- import (
- "../com"
- "fmt"
- "log"
- "net"
- "sync"
- "time"
- )
- type SClient struct {
- Total uint
- Positive bool
- Addr *net.UDPAddr
- packs map[uint]*DataPacket
- }
- type Server struct {
- port int
- pullGap time.Duration
- pullLen time.Duration
- logger *log.Logger
- conn *net.UDPConn
- clients map[string]*SClient
- mutex sync.Mutex
- }
- func NewServer(port int, gap int, len int, logger *log.Logger) *Server {
- ser := Server{
- port: port, conn: nil,
- pullGap: time.Duration(gap),
- pullLen: time.Duration(len),
- logger: logger,
- clients: map[string]*SClient{},
- }
- return &ser
- }
- func (ser *Server) Start() {
- defer ser.stop()
- addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", ser.port))
- if err != nil {
- ser.logger.Fatalf("Error resolving address:\n%s", err)
- }
- conn, err := net.ListenUDP("udp", addr)
- if err != nil {
- ser.logger.Fatalf("Error creating UDP connection:\n%s", err)
- }
- ser.conn = conn
- ser.logger.Printf("Server running on: udp://%s", addr)
- go ser.receiveThread()
- }
- func (ser *Server) receiveThread() {
- // 接收数据循环
- buffer := make([]byte, PackSize)
- for {
- n, addr, err := ser.conn.ReadFromUDP(buffer)
- if err != nil {
- ser.logger.Printf("Error reading from connection:\n%s", err)
- continue
- }
- packet := UnpackEventPacket(buffer[:n])
- go ser.eventHandler(addr, packet)
- // 处理接收到的数据并回显给客户端
- //fmt.Printf("Received message from %s: %s\n", addr.String(), string(buffer[:n]))
- //_, err = conn.WriteToUDP(buffer[:n], addr)
- //if err != nil {
- // fmt.Println("Error writing to connection:", err)
- //}
- }
- }
- func (ser *Server) stop() {
- if ser.local == nil {
- return
- }
- for _, client := range ser.clients {
- client.Con.Close()
- }
- }
- func (ser *Server) positiveThread() {
- for {
- time.Sleep(time.Second * ser.pullGap)
- for _, cli := range ser.clients {
- cli.Con.UdpDownEmit(com.Packet{Event: "start"})
- }
- time.Sleep(time.Second * ser.pullLen)
- for _, cli := range ser.clients {
- cli.Con.Emit(com.Packet{Event: "close"})
- }
- }
- }
- func (ser *Server) newClient(con *net.UDPConn) {
- conn := NewConnect(con, ser.logger)
- client := SClient{
- Id: string(com.RandBytes(8)),
- Total: uint(0), Con: conn, Addr:,
- }
- ser.mutex.Lock()
- ser.clients = append(ser.clients, &client)
- ser.mutex.Unlock()
- defer ser.removeClient(&client)
- packetChan := make(chan com.Packet)
- go client.Con.Run(packetChan)
- for {
- select {
- case packet := <-packetChan:
- go ser.eventHandler(&client, packet)
- }
- }
- }
- func (ser *Server) removeClient(client *SClient) {
- ser.mutex.Lock()
- for i, c := range ser.clients {
- if c == client {
- client.Con.Close()
- ser.clients = append(ser.clients[:i], ser.clients[i+1:]...)
- break
- }
- }
- ser.mutex.Unlock()
- }
- func (ser *Server) eventHandler(addr *net.UDPAddr, packet *EventPacket) { // data maybe used in future use-case
- switch packet.Event {
- case "init":
- client.Id = string(packet.Data)
- ser.logger.Printf("Client connect, id: [%s], addr: (%s)", client.Id, client.Con.con.RemoteAddr())
- break
- case "pin":
- ser.logger.Printf("pin from [%s], seq: %d", client.Id, packet.Data[0])
- packet.Event, packet.Data[0] = "pon", packet.Data[0]+1
- client.Con.Emit(packet)
- break
- case "packet":
- client.Total++
- total, data := com.BytesToUint(packet.Data[:4]), packet.Data[4:]
- ser.logger.Printf(
- "packet from [%s], lost: %.3f, len=%d, data: \"%s\"",
- client.Id, float64(total-client.Total)/float64(total), len(data), data,
- )
- break
- case "error":
- ser.logger.Printf("client break, id: [%s]", client.Id)
- ser.removeClient(client)
- break
- default:
- ser.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
- break
- }
- }
|