spliter.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package worker
  2. import (
  3. "Wine-Server/utils"
  4. "Wine-Server/utils/tables"
  5. "github.com/gin-gonic/gin"
  6. "github.com/gorilla/websocket"
  7. )
  8. func workerConnected(wid string, conn *websocket.Conn) *tables.WorkerTable {
  9. table := &tables.WorkerTable{Id: wid}
  10. err := table.Get()
  11. if err != nil {
  12. utils.Logger.Println("worker query failed:", err)
  13. _ = conn.WriteJSON(utils.WsError("worker query failed"))
  14. return nil
  15. }
  16. table.Last = utils.TimeNow()
  17. err = table.Update(utils.JsonType{"last": table.Last})
  18. if err != nil {
  19. utils.Logger.Println("update last active time failed:", err)
  20. _ = conn.WriteJSON(utils.WsError("Update last active time failed"))
  21. return nil
  22. }
  23. //keys := make([]string, 0, len(utils.WorkerWss))
  24. //for k := range utils.WorkerWss {
  25. // keys = append(keys, k)
  26. //}
  27. //utils.Logger.Println(wid, keys)
  28. //if _, exist := utils.WorkerWss[wid]; exist {
  29. // _ = conn.WriteJSON(utils.WsEvent("onePageOnly", nil))
  30. // return nil
  31. //}
  32. utils.WorkerLock.Lock()
  33. utils.WorkerWss[wid] = conn
  34. utils.WorkerLock.Unlock()
  35. utils.Logger.Printf("worker[%s] connected\n", wid)
  36. return table
  37. }
  38. func workerDisconnect(wid string) {
  39. utils.WorkerLock.Lock()
  40. delete(utils.WorkerWss, wid)
  41. utils.WorkerLock.Unlock()
  42. utils.Logger.Printf("worker[%s] disconnected\n", wid)
  43. }
  44. func loginSocket(conn *websocket.Conn) (string, error) {
  45. var wid string
  46. for {
  47. var msg utils.WsMsg
  48. err := conn.ReadJSON(&msg)
  49. if err != nil {
  50. _ = conn.Close()
  51. return "", err
  52. }
  53. switch msg.Event {
  54. case "pin":
  55. keepAlive(conn)
  56. break
  57. case "login":
  58. wid, err = login(conn, msg.Data)
  59. if err == nil {
  60. return wid, nil
  61. }
  62. break
  63. default:
  64. _ = conn.WriteJSON(utils.WsError("loginSocket event only"))
  65. break
  66. }
  67. }
  68. }
  69. func socketHandler(ctx *gin.Context) {
  70. conn, err := utils.UpgradeHttp2Ws.Upgrade(ctx.Writer, ctx.Request, nil)
  71. if err != nil {
  72. utils.Logger.Println("can't establish manager socket connect")
  73. ctx.JSON(utils.HttpError, utils.Fail("can't establish socket connect"))
  74. return
  75. }
  76. getSecrets(conn)
  77. var wid string
  78. token := ctx.Param("token")
  79. if token == "undefined" { // no token -> login
  80. wid, err = loginSocket(conn)
  81. } else {
  82. wid, err = utils.Redis.Get(utils.WxPayCli, token).Result()
  83. if err != nil { // expired -> login
  84. _ = conn.WriteJSON(utils.WsEvent("tokenExpired", nil))
  85. wid, err = loginSocket(conn)
  86. } else { // update expire time
  87. utils.Redis.Expire(utils.WxPayCli, token, utils.Duration(7*24*60*60))
  88. }
  89. }
  90. if err != nil {
  91. utils.Logger.Println(err)
  92. return
  93. }
  94. worker := workerConnected(wid, conn)
  95. if worker == nil {
  96. return
  97. }
  98. defer workerDisconnect(wid)
  99. info(conn, worker)
  100. workerConnected(wid, conn)
  101. defer workerDisconnect(wid)
  102. for {
  103. var msg utils.WsMsg
  104. err = conn.ReadJSON(&msg)
  105. if err != nil {
  106. return
  107. }
  108. switch msg.Event {
  109. case "pin":
  110. keepAlive(conn)
  111. break
  112. case "query":
  113. query(conn, wid, msg.Data)
  114. break
  115. case "detail":
  116. detail(conn, msg.Data)
  117. break
  118. case "ready":
  119. ready(conn, wid, msg.Data)
  120. break
  121. case "finish":
  122. finish(conn, worker, msg.Data)
  123. break
  124. default:
  125. _ = conn.WriteJSON(utils.WsError("unrecognized event"))
  126. break
  127. }
  128. }
  129. }