Client.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package tcp
  2. import (
  3. "../com"
  4. "bytes"
  5. "log"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. type Client struct {
  11. address string
  12. logger *log.Logger
  13. mutex sync.Mutex
  14. con *Connect
  15. positive bool
  16. working bool
  17. frequency time.Duration // f: 100ms
  18. }
  19. func NewClient(address string, positive bool, frequency int, logger *log.Logger) *Client {
  20. ret := Client{
  21. address: address,
  22. positive: positive, working: false,
  23. frequency: time.Duration(frequency * 100),
  24. logger: logger,
  25. }
  26. return &ret
  27. }
  28. func (cli *Client) Start() {
  29. defer cli.Close()
  30. conn, err := net.Dial("tcp", cli.address)
  31. if err != nil {
  32. cli.logger.Fatalf("Connection establish failed: %s", err)
  33. }
  34. cli.con = NewConnect(conn, cli.logger)
  35. cli.logger.Printf("Connection established, positive mode: %t.", cli.positive)
  36. data := make([]byte, 1)
  37. if cli.positive {
  38. data[0] = 1
  39. } else {
  40. data[0] = 0
  41. }
  42. cli.con.Emit(com.Packet{Event: "init", Data: data})
  43. if cli.positive {
  44. go cli.positiveThread()
  45. } else {
  46. go cli.transportThread()
  47. go cli.keepAliveThread()
  48. }
  49. packetChan := make(chan com.Packet)
  50. go cli.con.Run(packetChan)
  51. running := true
  52. for running {
  53. select {
  54. case packet := <-packetChan:
  55. if packet.Event == "error" {
  56. cli.logger.Printf("connect broken for remote server")
  57. running = false
  58. break
  59. } else {
  60. go cli.eventHandler(packet)
  61. }
  62. }
  63. }
  64. }
  65. func (cli *Client) Close() {
  66. cli.working = false
  67. cli.con.Close()
  68. }
  69. func (cli *Client) positiveThread() {
  70. send := uint(0)
  71. for {
  72. time.Sleep(time.Millisecond * cli.frequency)
  73. send++
  74. buffer := bytes.Buffer{}
  75. buffer.Write(com.UintToBytes(send))
  76. buffer.Write(com.RandBytesWithHeader())
  77. cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()})
  78. }
  79. }
  80. func (cli *Client) transportThread() {
  81. send := uint(0)
  82. bytes.IndexByte(com.RandBytes(50), ',')
  83. for {
  84. time.Sleep(time.Millisecond * cli.frequency)
  85. if cli.working {
  86. send++
  87. buffer := bytes.Buffer{}
  88. buffer.Write(com.UintToBytes(send))
  89. buffer.Write(com.RandBytesWithHeader())
  90. cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()})
  91. }
  92. }
  93. }
  94. func (cli *Client) keepAliveThread() {
  95. data := make([]byte, 1)
  96. data[0] = 0
  97. for {
  98. time.Sleep(time.Second * 60)
  99. data[0]++
  100. cli.con.Emit(com.Packet{Event: "pin", Data: data})
  101. }
  102. }
  103. func (cli *Client) eventHandler(packet com.Packet) { // data maybe used in future use-case
  104. switch packet.Event {
  105. case "pon":
  106. cli.logger.Printf("got pon, seq: %d", packet.Data[0])
  107. break
  108. case "start":
  109. if !cli.working {
  110. cli.working = true
  111. cli.logger.Printf("stream start")
  112. }
  113. break
  114. case "close":
  115. if cli.working {
  116. cli.working = false
  117. cli.logger.Printf("stream close")
  118. }
  119. break
  120. default:
  121. cli.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
  122. break
  123. }
  124. }