Client.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package udp
  2. import (
  3. "../com"
  4. "bytes"
  5. "log"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. type Client struct {
  11. address string
  12. port int
  13. positive bool
  14. frequency time.Duration // f: 100ms
  15. logger *log.Logger
  16. mutex sync.Mutex
  17. con *Connect
  18. working bool
  19. }
  20. func NewClient(address string, port int, positive bool, frequency int, logger *log.Logger) *Client {
  21. ret := Client{
  22. address: address, port: port,
  23. positive: positive, logger: logger,
  24. frequency: time.Duration(frequency * 100),
  25. working: false,
  26. }
  27. return &ret
  28. }
  29. func (cli *Client) Start() {
  30. defer cli.Close()
  31. remote, err := net.ResolveUDPAddr("udp", cli.address)
  32. if err != nil {
  33. cli.logger.Fatalf("Connection establish failed: %s", err)
  34. }
  35. conn, err := net.DialUDP("udp", nil, remote)
  36. if err != nil {
  37. cli.logger.Fatalf("Connection establish failed: %s", err)
  38. }
  39. cli.con = NewConnect(conn, cli.logger)
  40. cli.logger.Printf("Connection established, positive mode: %t.", cli.positive)
  41. cli.con.Emit(com.Packet{Event: "init", Data: []byte(cli.id)})
  42. go cli.keepAliveThread()
  43. if cli.positive {
  44. go cli.positiveThread()
  45. } else {
  46. go cli.transportThread()
  47. }
  48. packetChan := make(chan com.Packet)
  49. go cli.con.Run(packetChan)
  50. running := true
  51. for running {
  52. select {
  53. case packet := <-packetChan:
  54. if packet.Event == "error" {
  55. cli.logger.Printf("connect broken, id: [%s]", cli.id)
  56. running = false
  57. break
  58. } else {
  59. go cli.eventHandler(packet)
  60. }
  61. }
  62. }
  63. }
  64. func (cli *Client) Close() {
  65. cli.working = false
  66. cli.con.Close()
  67. }
  68. func (cli *Client) positiveThread() {
  69. send := uint(0)
  70. for {
  71. time.Sleep(time.Millisecond * cli.frequency)
  72. send++
  73. buffer := bytes.Buffer{}
  74. buffer.Write(com.UintToBytes(send))
  75. buffer.Write(com.RandBytesWithHeader())
  76. cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()})
  77. }
  78. }
  79. func (cli *Client) transportThread() {
  80. send := uint(0)
  81. for {
  82. time.Sleep(time.Millisecond * cli.frequency)
  83. if cli.working {
  84. send++
  85. buffer := bytes.Buffer{}
  86. buffer.Write(com.UintToBytes(send))
  87. buffer.Write(com.RandBytesWithHeader())
  88. cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()})
  89. }
  90. }
  91. }
  92. func (cli *Client) keepAliveThread() {
  93. data := make([]byte, 1)
  94. data[0] = 0
  95. for {
  96. time.Sleep(time.Second * 60)
  97. data[0]++
  98. cli.con.Emit(com.Packet{Event: "pin", Data: data})
  99. }
  100. }
  101. func (cli *Client) eventHandler(packet com.Packet) { // data maybe used in future use-case
  102. switch packet.Event {
  103. case "pon":
  104. cli.logger.Printf("got pon, seq: %d", packet.Data[0])
  105. break
  106. case "start":
  107. if !cli.positive && !cli.working {
  108. cli.working = true
  109. cli.logger.Printf("stream start")
  110. }
  111. break
  112. case "close":
  113. if !cli.positive && cli.working {
  114. cli.working = false
  115. cli.logger.Printf("stream close")
  116. }
  117. break
  118. default:
  119. cli.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
  120. break
  121. }
  122. }