package excservice import ( "errors" "fmt" "net/http" "net/http/httputil" "net/url" "sync" "time" "go.uber.org/zap" "go-admin/pkg/utility" "github.com/bytedance/sonic" log "github.com/go-admin-team/go-admin-core/logger" "github.com/gorilla/websocket" ) type WsConfig struct { WsUrl string ProxyUrl string ReqHeaders map[string][]string //连接的时候加入的头部信息 HeartbeatIntervalTime time.Duration // HeartbeatData func() []byte //心跳数据2 IsAutoReconnect bool ProtoHandleFunc func([]byte) error //协议处理函数 DecompressFunc func([]byte) ([]byte, error) //解压函数 ErrorHandleFunc func(err error) ConnectSuccessAfterSendMessage func() []byte //for reconnect IsDump bool readDeadLineTime time.Duration reconnectInterval time.Duration } var dialer = &websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: 30 * time.Second, EnableCompression: true, } type WsConn struct { c *websocket.Conn WsConfig writeBufferChan chan []byte pingMessageBufferChan chan []byte pongMessageBufferChan chan []byte closeMessageBufferChan chan []byte subs [][]byte close chan bool reConnectLock *sync.Mutex } type WsBuilder struct { wsConfig *WsConfig } func NewWsBuilder() *WsBuilder { return &WsBuilder{&WsConfig{ ReqHeaders: make(map[string][]string, 1), reconnectInterval: time.Second * 10, }} } func (b *WsBuilder) WsUrl(wsUrl string) *WsBuilder { b.wsConfig.WsUrl = wsUrl return b } func (b *WsBuilder) ProxyUrl(proxyUrl string) *WsBuilder { b.wsConfig.ProxyUrl = proxyUrl return b } func (b *WsBuilder) ReqHeader(key, value string) *WsBuilder { b.wsConfig.ReqHeaders[key] = append(b.wsConfig.ReqHeaders[key], value) return b } func (b *WsBuilder) AutoReconnect() *WsBuilder { b.wsConfig.IsAutoReconnect = true return b } func (b *WsBuilder) Dump() *WsBuilder { b.wsConfig.IsDump = true return b } func (b *WsBuilder) Heartbeat(heartbeat func() []byte, t time.Duration) *WsBuilder { b.wsConfig.HeartbeatIntervalTime = t b.wsConfig.HeartbeatData = heartbeat return b } func (b *WsBuilder) ReconnectInterval(t time.Duration) *WsBuilder { b.wsConfig.reconnectInterval = t return b } func (b *WsBuilder) ProtoHandleFunc(f func([]byte) error) *WsBuilder { b.wsConfig.ProtoHandleFunc = f return b } func (b *WsBuilder) DecompressFunc(f func([]byte) ([]byte, error)) *WsBuilder { b.wsConfig.DecompressFunc = f return b } func (b *WsBuilder) ErrorHandleFunc(f func(err error)) *WsBuilder { b.wsConfig.ErrorHandleFunc = f return b } func (b *WsBuilder) ConnectSuccessAfterSendMessage(msg func() []byte) *WsBuilder { b.wsConfig.ConnectSuccessAfterSendMessage = msg return b } func (b *WsBuilder) Build() *WsConn { wsConn := &WsConn{WsConfig: *b.wsConfig} return wsConn.NewWs() } func (ws *WsConn) NewWs() *WsConn { if ws.HeartbeatIntervalTime == 0 { ws.readDeadLineTime = time.Minute } else { ws.readDeadLineTime = ws.HeartbeatIntervalTime * 2 } if err := ws.connect(); err != nil { log.Error("[" + ws.WsUrl + "] " + err.Error()) return nil } ws.close = make(chan bool, 1) ws.pingMessageBufferChan = make(chan []byte, 10) ws.pongMessageBufferChan = make(chan []byte, 10) ws.closeMessageBufferChan = make(chan []byte, 10) ws.writeBufferChan = make(chan []byte, 10) ws.reConnectLock = new(sync.Mutex) go ws.writeRequest() go ws.receiveMessage() //if ws.ConnectSuccessAfterSendMessage != nil { // msg := ws.ConnectSuccessAfterSendMessage() // if msg != nil{ // ws.SendMessage(msg) // log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg)) // }else { // log.ErrorLogMsg("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg)) // } //} return ws } func (ws *WsConn) connect() error { const maxRetries = 5 // 最大重试次数 const retryDelay = 2 * time.Second // 每次重试的延迟时间 var wsConn *websocket.Conn var resp *http.Response var err error // 重试机制 for attempt := 1; attempt <= maxRetries; attempt++ { if ws.ProxyUrl != "" { proxy, err := url.Parse(ws.ProxyUrl) if err == nil { // log.Info("[ws][%s] proxy url:%s", zap.String("ws.WsUrl", ws.WsUrl)) dialer.Proxy = http.ProxyURL(proxy) } else { log.Error("[ws][" + ws.WsUrl + "] parse proxy url [" + ws.ProxyUrl + "] err: " + err.Error()) } } // 尝试连接 wsConn, resp, err = dialer.Dial(ws.WsUrl, http.Header(ws.ReqHeaders)) if err != nil { log.Error(fmt.Sprintf("[ws][%s] Dial attempt %d failed: %s", ws.WsUrl, attempt, err.Error())) // 如果开启了请求数据转储,打印响应信息 if ws.IsDump && resp != nil { dumpData, _ := httputil.DumpResponse(resp, true) log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData))) } // 达到最大重试次数,返回错误 if attempt == maxRetries { return fmt.Errorf("达到最大重试次数 [ws][%s]: %v", ws.WsUrl, err) } // 等待一段时间后重试 time.Sleep(retryDelay) } else { // 连接成功,退出循环 break } } // 设置读取超时时间 wsConn.SetReadDeadline(time.Now().Add(ws.readDeadLineTime)) // 如果开启了请求数据转储,打印响应信息 if ws.IsDump && resp != nil { dumpData, _ := httputil.DumpResponse(resp, true) log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData))) } // 记录连接成功的日志 log.Info("[ws][" + ws.WsUrl + "] connected") ws.c = wsConn return nil } func (ws *WsConn) reconnect() { ws.reConnectLock.Lock() defer ws.reConnectLock.Unlock() ws.c.Close() //主动关闭一次 var err error for retry := 1; retry <= 100; retry++ { err = ws.connect() if err != nil { log.Error("[ws] [" + ws.WsUrl + "] websocket reconnect fail , " + err.Error()) } else { break } time.Sleep(ws.WsConfig.reconnectInterval * time.Duration(retry)) } if err != nil { log.Error("[ws] [" + ws.WsUrl + "] retry connect 100 count fail , begin exiting. ") ws.CloseWs() if ws.ErrorHandleFunc != nil { ws.ErrorHandleFunc(errors.New("retry reconnect fail")) } } else { //re subscribe if ws.ConnectSuccessAfterSendMessage != nil { msg := ws.ConnectSuccessAfterSendMessage() if msg != nil { ws.SendMessage(msg) //log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg)) } else { log.Error("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg)) } //ws.SendMessage(msg) //log.InfoLog("[ws] [" + ws.WsUrl + "] execute the connect success after send message=" + string(msg)) time.Sleep(time.Second) //wait response } for _, sub := range ws.subs { log.Info("[ws] re subscribe: " + string(sub)) ws.SendMessage(sub) } } } func (ws *WsConn) writeRequest() { var ( heartTimer *time.Timer err error ) if ws.HeartbeatIntervalTime == 0 { heartTimer = time.NewTimer(time.Hour) } else { heartTimer = time.NewTimer(ws.HeartbeatIntervalTime) } for { select { case <-ws.close: log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting write message goroutine.") return case d := <-ws.writeBufferChan: err = ws.c.WriteMessage(websocket.TextMessage, d) case d := <-ws.pingMessageBufferChan: err = ws.c.WriteMessage(websocket.PingMessage, d) case d := <-ws.pongMessageBufferChan: err = ws.c.WriteMessage(websocket.PongMessage, d) case d := <-ws.closeMessageBufferChan: err = ws.c.WriteMessage(websocket.CloseMessage, d) case <-heartTimer.C: if ws.HeartbeatIntervalTime > 0 { err = ws.c.WriteMessage(websocket.TextMessage, ws.HeartbeatData()) heartTimer.Reset(ws.HeartbeatIntervalTime) } } if err != nil { log.Info("[ws][" + ws.WsUrl + "] write message " + err.Error()) //time.Sleep(time.Second) } } } func (ws *WsConn) Subscribe(subEvent interface{}) error { data, err := sonic.Marshal(subEvent) if err != nil { log.Error("[ws]["+ws.WsUrl+"] json encode error , ", zap.Error(err)) return err } //ws.writeBufferChan <- data ws.SendMessage(data) ws.subs = append(ws.subs, data) return nil } func (ws *WsConn) SendMessage(msg []byte) { defer func() { //打印panic的错误信息 if err := recover(); err != nil { //产生了panic异常 fmt.Printf("SendMessage,panic: %s\r\n", err) } }() ws.writeBufferChan <- msg } func (ws *WsConn) SendPingMessage(msg []byte) { ws.pingMessageBufferChan <- msg } func (ws *WsConn) SendPongMessage(msg []byte) { ws.pongMessageBufferChan <- msg } func (ws *WsConn) SendCloseMessage(msg []byte) { ws.closeMessageBufferChan <- msg } func (ws *WsConn) SendJsonMessage(m interface{}) error { data, err := sonic.Marshal(m) if err != nil { return err } //ws.writeBufferChan <- data ws.SendMessage(data) return nil } func (ws *WsConn) receiveMessage() { //exit ws.c.SetCloseHandler(func(code int, text string) error { log.Info("[ws][" + ws.WsUrl + "] websocket exiting [code=" + utility.IntTostring(code) + " , text=" + text + "]") //ws.CloseWs() return nil }) ws.c.SetPongHandler(func(pong string) error { // log.Info("[" + ws.WsUrl + "] received [pong] " + pong) ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime)) return nil }) ws.c.SetPingHandler(func(ping string) error { // log.Info("[" + ws.WsUrl + "] received [ping] " + ping) ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime)) return nil }) for { select { case <-ws.close: log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting receive message goroutine.") return default: t, msg, err := ws.c.ReadMessage() if err != nil { log.Info("ws.c.ReadMessage[ws][" + ws.WsUrl + "] " + err.Error()) if ws.IsAutoReconnect { log.Info("[ws][" + ws.WsUrl + "] Unexpected Closed , Begin Retry Connect.") ws.reconnect() continue } if ws.ErrorHandleFunc != nil { ws.ErrorHandleFunc(err) } return } // Log.Debug(string(msg)) ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime)) switch t { case websocket.TextMessage: ws.ProtoHandleFunc(msg) case websocket.BinaryMessage: if ws.DecompressFunc == nil { ws.ProtoHandleFunc(msg) } else { msg2, err := ws.DecompressFunc(msg) if err != nil { log.Error("[ws] decompress error " + ws.WsUrl + err.Error()) } else { ws.ProtoHandleFunc(msg2) } } // case websocket.CloseMessage: // ws.CloseWs() default: log.Error("[ws][" + ws.WsUrl + "] error websocket message type , content is " + string(msg)) } } } } func (ws *WsConn) CloseWs() { defer func() { //打印panic的错误信息 if err := recover(); err != nil { //产生了panic异常 fmt.Printf("CloseWs,panic: %s\r\n", err) } }() //ws.close <- true close(ws.close) close(ws.writeBufferChan) close(ws.closeMessageBufferChan) close(ws.pingMessageBufferChan) close(ws.pongMessageBufferChan) err := ws.c.Close() if err != nil { log.Error("CloseWs[ws]["+ws.WsUrl+"] close websocket error ,", zap.Error(err)) } } func (ws *WsConn) clearChannel(c chan struct{}) { for { if len(c) > 0 { <-c } else { break } } }