package udp import ( "../com" "bytes" "log" "net" "sync" "time" ) type Client struct { address string port int positive bool frequency time.Duration // f: 100ms logger *log.Logger mutex sync.Mutex con *Connect working bool } func NewClient(address string, port int, positive bool, frequency int, logger *log.Logger) *Client { ret := Client{ address: address, port: port, positive: positive, logger: logger, frequency: time.Duration(frequency * 100), working: false, } return &ret } func (cli *Client) Start() { defer cli.Close() remote, err := net.ResolveUDPAddr("udp", cli.address) if err != nil { cli.logger.Fatalf("Connection establish failed: %s", err) } conn, err := net.DialUDP("udp", nil, remote) if err != nil { cli.logger.Fatalf("Connection establish failed: %s", err) } cli.con = NewConnect(conn, cli.logger) cli.logger.Printf("Connection established, positive mode: %t.", cli.positive) cli.con.Emit(com.Packet{Event: "init", Data: []byte(cli.id)}) go cli.keepAliveThread() if cli.positive { go cli.positiveThread() } else { go cli.transportThread() } packetChan := make(chan com.Packet) go cli.con.Run(packetChan) running := true for running { select { case packet := <-packetChan: if packet.Event == "error" { cli.logger.Printf("connect broken, id: [%s]", cli.id) running = false break } else { go cli.eventHandler(packet) } } } } func (cli *Client) Close() { cli.working = false cli.con.Close() } func (cli *Client) positiveThread() { send := uint(0) for { time.Sleep(time.Millisecond * cli.frequency) send++ buffer := bytes.Buffer{} buffer.Write(com.UintToBytes(send)) buffer.Write(com.RandBytesWithHeader()) cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()}) } } func (cli *Client) transportThread() { send := uint(0) for { time.Sleep(time.Millisecond * cli.frequency) if cli.working { send++ buffer := bytes.Buffer{} buffer.Write(com.UintToBytes(send)) buffer.Write(com.RandBytesWithHeader()) cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()}) } } } func (cli *Client) keepAliveThread() { data := make([]byte, 1) data[0] = 0 for { time.Sleep(time.Second * 60) data[0]++ cli.con.Emit(com.Packet{Event: "pin", Data: data}) } } func (cli *Client) eventHandler(packet com.Packet) { // data maybe used in future use-case switch packet.Event { case "pon": cli.logger.Printf("got pon, seq: %d", packet.Data[0]) break case "start": if !cli.positive && !cli.working { cli.working = true cli.logger.Printf("stream start") } break case "close": if !cli.positive && cli.working { cli.working = false cli.logger.Printf("stream close") } break default: cli.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data) break } }