443 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			443 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								package excservice
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"errors"
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"net/http"
							 | 
						||
| 
								 | 
							
									"net/http/httputil"
							 | 
						||
| 
								 | 
							
									"net/url"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
									"time"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"go.uber.org/zap"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"go-admin/pkg/utility"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/bytedance/sonic"
							 | 
						||
| 
								 | 
							
									log "github.com/go-admin-team/go-admin-core/logger"
							 | 
						||
| 
								 | 
							
									"github.com/gorilla/websocket"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type WsConfig struct {
							 | 
						||
| 
								 | 
							
									WsUrl                          string
							 | 
						||
| 
								 | 
							
									ProxyUrl                       string
							 | 
						||
| 
								 | 
							
									ReqHeaders                     map[string][]string //连接的时候加入的头部信息
							 | 
						||
| 
								 | 
							
									HeartbeatIntervalTime          time.Duration       //
							 | 
						||
| 
								 | 
							
									HeartbeatData                  func() []byte       //心跳数据2
							 | 
						||
| 
								 | 
							
									IsAutoReconnect                bool
							 | 
						||
| 
								 | 
							
									ProtoHandleFunc                func([]byte) error           //协议处理函数
							 | 
						||
| 
								 | 
							
									DecompressFunc                 func([]byte) ([]byte, error) //解压函数
							 | 
						||
| 
								 | 
							
									ErrorHandleFunc                func(err error)
							 | 
						||
| 
								 | 
							
									ConnectSuccessAfterSendMessage func() []byte //for reconnect
							 | 
						||
| 
								 | 
							
									IsDump                         bool
							 | 
						||
| 
								 | 
							
									readDeadLineTime               time.Duration
							 | 
						||
| 
								 | 
							
									reconnectInterval              time.Duration
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								var dialer = &websocket.Dialer{
							 | 
						||
| 
								 | 
							
									Proxy:             http.ProxyFromEnvironment,
							 | 
						||
| 
								 | 
							
									HandshakeTimeout:  30 * time.Second,
							 | 
						||
| 
								 | 
							
									EnableCompression: true,
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type WsConn struct {
							 | 
						||
| 
								 | 
							
									c *websocket.Conn
							 | 
						||
| 
								 | 
							
									WsConfig
							 | 
						||
| 
								 | 
							
									writeBufferChan        chan []byte
							 | 
						||
| 
								 | 
							
									pingMessageBufferChan  chan []byte
							 | 
						||
| 
								 | 
							
									pongMessageBufferChan  chan []byte
							 | 
						||
| 
								 | 
							
									closeMessageBufferChan chan []byte
							 | 
						||
| 
								 | 
							
									subs                   [][]byte
							 | 
						||
| 
								 | 
							
									close                  chan bool
							 | 
						||
| 
								 | 
							
									reConnectLock          *sync.Mutex
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type WsBuilder struct {
							 | 
						||
| 
								 | 
							
									wsConfig *WsConfig
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func NewWsBuilder() *WsBuilder {
							 | 
						||
| 
								 | 
							
									return &WsBuilder{&WsConfig{
							 | 
						||
| 
								 | 
							
										ReqHeaders:        make(map[string][]string, 1),
							 | 
						||
| 
								 | 
							
										reconnectInterval: time.Second * 10,
							 | 
						||
| 
								 | 
							
									}}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) WsUrl(wsUrl string) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.WsUrl = wsUrl
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) ProxyUrl(proxyUrl string) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.ProxyUrl = proxyUrl
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) ReqHeader(key, value string) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.ReqHeaders[key] = append(b.wsConfig.ReqHeaders[key], value)
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) AutoReconnect() *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.IsAutoReconnect = true
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) Dump() *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.IsDump = true
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) Heartbeat(heartbeat func() []byte, t time.Duration) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.HeartbeatIntervalTime = t
							 | 
						||
| 
								 | 
							
									b.wsConfig.HeartbeatData = heartbeat
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) ReconnectInterval(t time.Duration) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.reconnectInterval = t
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) ProtoHandleFunc(f func([]byte) error) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.ProtoHandleFunc = f
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) DecompressFunc(f func([]byte) ([]byte, error)) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.DecompressFunc = f
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) ErrorHandleFunc(f func(err error)) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.ErrorHandleFunc = f
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) ConnectSuccessAfterSendMessage(msg func() []byte) *WsBuilder {
							 | 
						||
| 
								 | 
							
									b.wsConfig.ConnectSuccessAfterSendMessage = msg
							 | 
						||
| 
								 | 
							
									return b
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (b *WsBuilder) Build() *WsConn {
							 | 
						||
| 
								 | 
							
									wsConn := &WsConn{WsConfig: *b.wsConfig}
							 | 
						||
| 
								 | 
							
									return wsConn.NewWs()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) NewWs() *WsConn {
							 | 
						||
| 
								 | 
							
									if ws.HeartbeatIntervalTime == 0 {
							 | 
						||
| 
								 | 
							
										ws.readDeadLineTime = time.Minute
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										ws.readDeadLineTime = ws.HeartbeatIntervalTime * 2
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if err := ws.connect(); err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("[" + ws.WsUrl + "] " + err.Error())
							 | 
						||
| 
								 | 
							
										return nil
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									ws.close = make(chan bool, 1)
							 | 
						||
| 
								 | 
							
									ws.pingMessageBufferChan = make(chan []byte, 10)
							 | 
						||
| 
								 | 
							
									ws.pongMessageBufferChan = make(chan []byte, 10)
							 | 
						||
| 
								 | 
							
									ws.closeMessageBufferChan = make(chan []byte, 10)
							 | 
						||
| 
								 | 
							
									ws.writeBufferChan = make(chan []byte, 10)
							 | 
						||
| 
								 | 
							
									ws.reConnectLock = new(sync.Mutex)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									go ws.writeRequest()
							 | 
						||
| 
								 | 
							
									go ws.receiveMessage()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									//if ws.ConnectSuccessAfterSendMessage != nil {
							 | 
						||
| 
								 | 
							
									//	msg := ws.ConnectSuccessAfterSendMessage()
							 | 
						||
| 
								 | 
							
									//	if msg != nil{
							 | 
						||
| 
								 | 
							
									//		ws.SendMessage(msg)
							 | 
						||
| 
								 | 
							
									//		log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg))
							 | 
						||
| 
								 | 
							
									//	}else {
							 | 
						||
| 
								 | 
							
									//		log.ErrorLogMsg("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg))
							 | 
						||
| 
								 | 
							
									//	}
							 | 
						||
| 
								 | 
							
									//}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return ws
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) connect() error {
							 | 
						||
| 
								 | 
							
									const maxRetries = 5               // 最大重试次数
							 | 
						||
| 
								 | 
							
									const retryDelay = 2 * time.Second // 每次重试的延迟时间
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var wsConn *websocket.Conn
							 | 
						||
| 
								 | 
							
									var resp *http.Response
							 | 
						||
| 
								 | 
							
									var err error
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 重试机制
							 | 
						||
| 
								 | 
							
									for attempt := 1; attempt <= maxRetries; attempt++ {
							 | 
						||
| 
								 | 
							
										if ws.ProxyUrl != "" {
							 | 
						||
| 
								 | 
							
											proxy, err := url.Parse(ws.ProxyUrl)
							 | 
						||
| 
								 | 
							
											if err == nil {
							 | 
						||
| 
								 | 
							
												// log.Info("[ws][%s] proxy url:%s", zap.String("ws.WsUrl", ws.WsUrl))
							 | 
						||
| 
								 | 
							
												dialer.Proxy = http.ProxyURL(proxy)
							 | 
						||
| 
								 | 
							
											} else {
							 | 
						||
| 
								 | 
							
												log.Error("[ws][" + ws.WsUrl + "] parse proxy url [" + ws.ProxyUrl + "] err: " + err.Error())
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// 尝试连接
							 | 
						||
| 
								 | 
							
										wsConn, resp, err = dialer.Dial(ws.WsUrl, http.Header(ws.ReqHeaders))
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											log.Error(fmt.Sprintf("[ws][%s] Dial attempt %d failed: %s", ws.WsUrl, attempt, err.Error()))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											// 如果开启了请求数据转储,打印响应信息
							 | 
						||
| 
								 | 
							
											if ws.IsDump && resp != nil {
							 | 
						||
| 
								 | 
							
												dumpData, _ := httputil.DumpResponse(resp, true)
							 | 
						||
| 
								 | 
							
												log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData)))
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											// 达到最大重试次数,返回错误
							 | 
						||
| 
								 | 
							
											if attempt == maxRetries {
							 | 
						||
| 
								 | 
							
												return fmt.Errorf("达到最大重试次数 [ws][%s]: %v", ws.WsUrl, err)
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											// 等待一段时间后重试
							 | 
						||
| 
								 | 
							
											time.Sleep(retryDelay)
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											// 连接成功,退出循环
							 | 
						||
| 
								 | 
							
											break
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 设置读取超时时间
							 | 
						||
| 
								 | 
							
									wsConn.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 如果开启了请求数据转储,打印响应信息
							 | 
						||
| 
								 | 
							
									if ws.IsDump && resp != nil {
							 | 
						||
| 
								 | 
							
										dumpData, _ := httputil.DumpResponse(resp, true)
							 | 
						||
| 
								 | 
							
										log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData)))
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 记录连接成功的日志
							 | 
						||
| 
								 | 
							
									log.Info("[ws][" + ws.WsUrl + "] connected")
							 | 
						||
| 
								 | 
							
									ws.c = wsConn
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) reconnect() {
							 | 
						||
| 
								 | 
							
									ws.reConnectLock.Lock()
							 | 
						||
| 
								 | 
							
									defer ws.reConnectLock.Unlock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									ws.c.Close() //主动关闭一次
							 | 
						||
| 
								 | 
							
									var err error
							 | 
						||
| 
								 | 
							
									for retry := 1; retry <= 100; retry++ {
							 | 
						||
| 
								 | 
							
										err = ws.connect()
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											log.Error("[ws] [" + ws.WsUrl + "] websocket reconnect fail , " + err.Error())
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											break
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										time.Sleep(ws.WsConfig.reconnectInterval * time.Duration(retry))
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("[ws] [" + ws.WsUrl + "] retry connect 100 count fail , begin exiting. ")
							 | 
						||
| 
								 | 
							
										ws.CloseWs()
							 | 
						||
| 
								 | 
							
										if ws.ErrorHandleFunc != nil {
							 | 
						||
| 
								 | 
							
											ws.ErrorHandleFunc(errors.New("retry reconnect fail"))
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										//re subscribe
							 | 
						||
| 
								 | 
							
										if ws.ConnectSuccessAfterSendMessage != nil {
							 | 
						||
| 
								 | 
							
											msg := ws.ConnectSuccessAfterSendMessage()
							 | 
						||
| 
								 | 
							
											if msg != nil {
							 | 
						||
| 
								 | 
							
												ws.SendMessage(msg)
							 | 
						||
| 
								 | 
							
												//log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg))
							 | 
						||
| 
								 | 
							
											} else {
							 | 
						||
| 
								 | 
							
												log.Error("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg))
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											//ws.SendMessage(msg)
							 | 
						||
| 
								 | 
							
											//log.InfoLog("[ws] [" + ws.WsUrl + "] execute the connect success after send message=" + string(msg))
							 | 
						||
| 
								 | 
							
											time.Sleep(time.Second) //wait response
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										for _, sub := range ws.subs {
							 | 
						||
| 
								 | 
							
											log.Info("[ws] re subscribe: " + string(sub))
							 | 
						||
| 
								 | 
							
											ws.SendMessage(sub)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) writeRequest() {
							 | 
						||
| 
								 | 
							
									var (
							 | 
						||
| 
								 | 
							
										heartTimer *time.Timer
							 | 
						||
| 
								 | 
							
										err        error
							 | 
						||
| 
								 | 
							
									)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if ws.HeartbeatIntervalTime == 0 {
							 | 
						||
| 
								 | 
							
										heartTimer = time.NewTimer(time.Hour)
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										heartTimer = time.NewTimer(ws.HeartbeatIntervalTime)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-ws.close:
							 | 
						||
| 
								 | 
							
											log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting write message goroutine.")
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										case d := <-ws.writeBufferChan:
							 | 
						||
| 
								 | 
							
											err = ws.c.WriteMessage(websocket.TextMessage, d)
							 | 
						||
| 
								 | 
							
										case d := <-ws.pingMessageBufferChan:
							 | 
						||
| 
								 | 
							
											err = ws.c.WriteMessage(websocket.PingMessage, d)
							 | 
						||
| 
								 | 
							
										case d := <-ws.pongMessageBufferChan:
							 | 
						||
| 
								 | 
							
											err = ws.c.WriteMessage(websocket.PongMessage, d)
							 | 
						||
| 
								 | 
							
										case d := <-ws.closeMessageBufferChan:
							 | 
						||
| 
								 | 
							
											err = ws.c.WriteMessage(websocket.CloseMessage, d)
							 | 
						||
| 
								 | 
							
										case <-heartTimer.C:
							 | 
						||
| 
								 | 
							
											if ws.HeartbeatIntervalTime > 0 {
							 | 
						||
| 
								 | 
							
												err = ws.c.WriteMessage(websocket.TextMessage, ws.HeartbeatData())
							 | 
						||
| 
								 | 
							
												heartTimer.Reset(ws.HeartbeatIntervalTime)
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											log.Info("[ws][" + ws.WsUrl + "] write message " + err.Error())
							 | 
						||
| 
								 | 
							
											//time.Sleep(time.Second)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) Subscribe(subEvent interface{}) error {
							 | 
						||
| 
								 | 
							
									data, err := sonic.Marshal(subEvent)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("[ws]["+ws.WsUrl+"] json encode error , ", zap.Error(err))
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									//ws.writeBufferChan <- data
							 | 
						||
| 
								 | 
							
									ws.SendMessage(data)
							 | 
						||
| 
								 | 
							
									ws.subs = append(ws.subs, data)
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) SendMessage(msg []byte) {
							 | 
						||
| 
								 | 
							
									defer func() {
							 | 
						||
| 
								 | 
							
										//打印panic的错误信息
							 | 
						||
| 
								 | 
							
										if err := recover(); err != nil { //产生了panic异常
							 | 
						||
| 
								 | 
							
											fmt.Printf("SendMessage,panic: %s\r\n", err)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
									ws.writeBufferChan <- msg
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) SendPingMessage(msg []byte) {
							 | 
						||
| 
								 | 
							
									ws.pingMessageBufferChan <- msg
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) SendPongMessage(msg []byte) {
							 | 
						||
| 
								 | 
							
									ws.pongMessageBufferChan <- msg
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) SendCloseMessage(msg []byte) {
							 | 
						||
| 
								 | 
							
									ws.closeMessageBufferChan <- msg
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) SendJsonMessage(m interface{}) error {
							 | 
						||
| 
								 | 
							
									data, err := sonic.Marshal(m)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									//ws.writeBufferChan <- data
							 | 
						||
| 
								 | 
							
									ws.SendMessage(data)
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) receiveMessage() {
							 | 
						||
| 
								 | 
							
									//exit
							 | 
						||
| 
								 | 
							
									ws.c.SetCloseHandler(func(code int, text string) error {
							 | 
						||
| 
								 | 
							
										log.Info("[ws][" + ws.WsUrl + "] websocket exiting [code=" + utility.IntTostring(code) + " , text=" + text + "]")
							 | 
						||
| 
								 | 
							
										//ws.CloseWs()
							 | 
						||
| 
								 | 
							
										return nil
							 | 
						||
| 
								 | 
							
									})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									ws.c.SetPongHandler(func(pong string) error {
							 | 
						||
| 
								 | 
							
										// log.Info("[" + ws.WsUrl + "] received [pong] " + pong)
							 | 
						||
| 
								 | 
							
										ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
							 | 
						||
| 
								 | 
							
										return nil
							 | 
						||
| 
								 | 
							
									})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									ws.c.SetPingHandler(func(ping string) error {
							 | 
						||
| 
								 | 
							
										// log.Info("[" + ws.WsUrl + "] received [ping] " + ping)
							 | 
						||
| 
								 | 
							
										ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
							 | 
						||
| 
								 | 
							
										return nil
							 | 
						||
| 
								 | 
							
									})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-ws.close:
							 | 
						||
| 
								 | 
							
											log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting receive message goroutine.")
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										default:
							 | 
						||
| 
								 | 
							
											t, msg, err := ws.c.ReadMessage()
							 | 
						||
| 
								 | 
							
											if err != nil {
							 | 
						||
| 
								 | 
							
												log.Info("ws.c.ReadMessage[ws][" + ws.WsUrl + "] " + err.Error())
							 | 
						||
| 
								 | 
							
												if ws.IsAutoReconnect {
							 | 
						||
| 
								 | 
							
													log.Info("[ws][" + ws.WsUrl + "] Unexpected Closed , Begin Retry Connect.")
							 | 
						||
| 
								 | 
							
													ws.reconnect()
							 | 
						||
| 
								 | 
							
													continue
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												if ws.ErrorHandleFunc != nil {
							 | 
						||
| 
								 | 
							
													ws.ErrorHandleFunc(err)
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											//			Log.Debug(string(msg))
							 | 
						||
| 
								 | 
							
											ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
							 | 
						||
| 
								 | 
							
											switch t {
							 | 
						||
| 
								 | 
							
											case websocket.TextMessage:
							 | 
						||
| 
								 | 
							
												ws.ProtoHandleFunc(msg)
							 | 
						||
| 
								 | 
							
											case websocket.BinaryMessage:
							 | 
						||
| 
								 | 
							
												if ws.DecompressFunc == nil {
							 | 
						||
| 
								 | 
							
													ws.ProtoHandleFunc(msg)
							 | 
						||
| 
								 | 
							
												} else {
							 | 
						||
| 
								 | 
							
													msg2, err := ws.DecompressFunc(msg)
							 | 
						||
| 
								 | 
							
													if err != nil {
							 | 
						||
| 
								 | 
							
														log.Error("[ws] decompress error " + ws.WsUrl + err.Error())
							 | 
						||
| 
								 | 
							
													} else {
							 | 
						||
| 
								 | 
							
														ws.ProtoHandleFunc(msg2)
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
												//	case websocket.CloseMessage:
							 | 
						||
| 
								 | 
							
												//	ws.CloseWs()
							 | 
						||
| 
								 | 
							
											default:
							 | 
						||
| 
								 | 
							
												log.Error("[ws][" + ws.WsUrl + "] error websocket message type , content is " + string(msg))
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) CloseWs() {
							 | 
						||
| 
								 | 
							
									defer func() {
							 | 
						||
| 
								 | 
							
										//打印panic的错误信息
							 | 
						||
| 
								 | 
							
										if err := recover(); err != nil { //产生了panic异常
							 | 
						||
| 
								 | 
							
											fmt.Printf("CloseWs,panic: %s\r\n", err)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
									//ws.close <- true
							 | 
						||
| 
								 | 
							
									close(ws.close)
							 | 
						||
| 
								 | 
							
									close(ws.writeBufferChan)
							 | 
						||
| 
								 | 
							
									close(ws.closeMessageBufferChan)
							 | 
						||
| 
								 | 
							
									close(ws.pingMessageBufferChan)
							 | 
						||
| 
								 | 
							
									close(ws.pongMessageBufferChan)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									err := ws.c.Close()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("CloseWs[ws]["+ws.WsUrl+"] close websocket error ,", zap.Error(err))
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (ws *WsConn) clearChannel(c chan struct{}) {
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										if len(c) > 0 {
							 | 
						||
| 
								 | 
							
											<-c
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											break
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 |