123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package tcp
- import (
- "../com"
- "bytes"
- "log"
- "net"
- "sync"
- "time"
- )
- type Client struct {
- address string
- logger *log.Logger
- mutex sync.Mutex
- con *Connect
- positive bool
- working bool
- frequency time.Duration // f: 100ms
- }
- func NewClient(address string, positive bool, frequency int, logger *log.Logger) *Client {
- ret := Client{
- address: address,
- positive: positive, working: false,
- frequency: time.Duration(frequency * 100),
- logger: logger,
- }
- return &ret
- }
- func (cli *Client) Start() {
- defer cli.Close()
- conn, err := net.Dial("tcp", cli.address)
- if err != nil {
- cli.logger.Fatalf("Connection establish failed: %s", err)
- }
- cli.con = NewConnect(conn, cli.logger)
- cli.logger.Printf("Connection established, positive mode: %t.", cli.positive)
- data := make([]byte, 1)
- if cli.positive {
- data[0] = 1
- } else {
- data[0] = 0
- }
- cli.con.Emit(com.Packet{Event: "init", Data: data})
- if cli.positive {
- go cli.positiveThread()
- } else {
- go cli.transportThread()
- go cli.keepAliveThread()
- }
- packetChan := make(chan com.Packet)
- go cli.con.Run(packetChan)
- running := true
- for running {
- select {
- case packet := <-packetChan:
- if packet.Event == "error" {
- cli.logger.Printf("connect broken for remote server")
- running = false
- break
- } else {
- go cli.eventHandler(packet)
- }
- }
- }
- }
- func (cli *Client) Close() {
- cli.working = false
- cli.con.Close()
- }
- func (cli *Client) positiveThread() {
- send := uint(0)
- for {
- time.Sleep(time.Millisecond * cli.frequency)
- send++
- buffer := bytes.Buffer{}
- buffer.Write(com.UintToBytes(send))
- buffer.Write(com.RandBytesWithHeader())
- cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()})
- }
- }
- func (cli *Client) transportThread() {
- send := uint(0)
- bytes.IndexByte(com.RandBytes(50), ',')
- for {
- time.Sleep(time.Millisecond * cli.frequency)
- if cli.working {
- send++
- buffer := bytes.Buffer{}
- buffer.Write(com.UintToBytes(send))
- buffer.Write(com.RandBytesWithHeader())
- cli.con.Emit(com.Packet{Event: "packet", Data: buffer.Bytes()})
- }
- }
- }
- func (cli *Client) keepAliveThread() {
- data := make([]byte, 1)
- data[0] = 0
- for {
- time.Sleep(time.Second * 60)
- data[0]++
- cli.con.Emit(com.Packet{Event: "pin", Data: data})
- }
- }
- func (cli *Client) eventHandler(packet com.Packet) { // data maybe used in future use-case
- switch packet.Event {
- case "pon":
- cli.logger.Printf("got pon, seq: %d", packet.Data[0])
- break
- case "start":
- if !cli.working {
- cli.working = true
- cli.logger.Printf("stream start")
- }
- break
- case "close":
- if cli.working {
- cli.working = false
- cli.logger.Printf("stream close")
- }
- break
- default:
- cli.logger.Printf("unrecognized event: %s, with data: %s", packet.Event, packet.Data)
- break
- }
- }
|