123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package worker
- import (
- "Wine-Server/utils"
- "Wine-Server/utils/tables"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- )
- func workerConnected(wid string, conn *websocket.Conn) *tables.WorkerTable {
- table := &tables.WorkerTable{Id: wid}
- err := table.Get()
- if err != nil {
- utils.Logger.Println("worker query failed:", err)
- _ = conn.WriteJSON(utils.WsError("worker query failed"))
- return nil
- }
- table.Last = utils.TimeNow()
- err = table.Update(utils.JsonType{"last": table.Last})
- if err != nil {
- utils.Logger.Println("update last active time failed:", err)
- _ = conn.WriteJSON(utils.WsError("Update last active time failed"))
- return nil
- }
- //keys := make([]string, 0, len(utils.WorkerWss))
- //for k := range utils.WorkerWss {
- // keys = append(keys, k)
- //}
- //utils.Logger.Println(wid, keys)
- //if _, exist := utils.WorkerWss[wid]; exist {
- // _ = conn.WriteJSON(utils.WsEvent("onePageOnly", nil))
- // return nil
- //}
- utils.WorkerLock.Lock()
- utils.WorkerWss[wid] = conn
- utils.WorkerLock.Unlock()
- utils.Logger.Printf("worker[%s] connected\n", wid)
- return table
- }
- func workerDisconnect(wid string) {
- utils.WorkerLock.Lock()
- delete(utils.WorkerWss, wid)
- utils.WorkerLock.Unlock()
- utils.Logger.Printf("worker[%s] disconnected\n", wid)
- }
- func loginSocket(conn *websocket.Conn) (string, error) {
- var wid string
- for {
- var msg utils.WsMsg
- err := conn.ReadJSON(&msg)
- if err != nil {
- _ = conn.Close()
- return "", err
- }
- switch msg.Event {
- case "pin":
- keepAlive(conn)
- break
- case "login":
- wid, err = login(conn, msg.Data)
- if err == nil {
- return wid, nil
- }
- break
- default:
- _ = conn.WriteJSON(utils.WsError("loginSocket event only"))
- break
- }
- }
- }
- func socketHandler(ctx *gin.Context) {
- conn, err := utils.UpgradeHttp2Ws.Upgrade(ctx.Writer, ctx.Request, nil)
- if err != nil {
- utils.Logger.Println("can't establish manager socket connect")
- ctx.JSON(utils.HttpError, utils.Fail("can't establish socket connect"))
- return
- }
- getSecrets(conn)
- var wid string
- token := ctx.Param("token")
- if token == "undefined" { // no token -> login
- wid, err = loginSocket(conn)
- } else {
- wid, err = utils.Redis.Get(utils.WxPayCli, token).Result()
- if err != nil { // expired -> login
- _ = conn.WriteJSON(utils.WsEvent("tokenExpired", nil))
- wid, err = loginSocket(conn)
- } else { // update expire time
- utils.Redis.Expire(utils.WxPayCli, token, utils.Duration(7*24*60*60))
- }
- }
- if err != nil {
- utils.Logger.Println(err)
- return
- }
- worker := workerConnected(wid, conn)
- if worker == nil {
- return
- }
- defer workerDisconnect(wid)
- info(conn, worker)
- workerConnected(wid, conn)
- defer workerDisconnect(wid)
- for {
- var msg utils.WsMsg
- err = conn.ReadJSON(&msg)
- if err != nil {
- return
- }
- switch msg.Event {
- case "pin":
- keepAlive(conn)
- break
- case "query":
- query(conn, wid, msg.Data)
- break
- case "detail":
- detail(conn, msg.Data)
- break
- case "ready":
- ready(conn, wid, msg.Data)
- break
- case "finish":
- finish(conn, worker, msg.Data)
- break
- default:
- _ = conn.WriteJSON(utils.WsError("unrecognized event"))
- break
- }
- }
- }
|