443 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			443 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package excservice
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"net/http"
 | 
						|
	"net/http/httputil"
 | 
						|
	"net/url"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"go.uber.org/zap"
 | 
						|
 | 
						|
	"go-admin/pkg/utility"
 | 
						|
 | 
						|
	"github.com/bytedance/sonic"
 | 
						|
	log "github.com/go-admin-team/go-admin-core/logger"
 | 
						|
	"github.com/gorilla/websocket"
 | 
						|
)
 | 
						|
 | 
						|
type WsConfig struct {
 | 
						|
	WsUrl                          string
 | 
						|
	ProxyUrl                       string
 | 
						|
	ReqHeaders                     map[string][]string //连接的时候加入的头部信息
 | 
						|
	HeartbeatIntervalTime          time.Duration       //
 | 
						|
	HeartbeatData                  func() []byte       //心跳数据2
 | 
						|
	IsAutoReconnect                bool
 | 
						|
	ProtoHandleFunc                func([]byte) error           //协议处理函数
 | 
						|
	DecompressFunc                 func([]byte) ([]byte, error) //解压函数
 | 
						|
	ErrorHandleFunc                func(err error)
 | 
						|
	ConnectSuccessAfterSendMessage func() []byte //for reconnect
 | 
						|
	IsDump                         bool
 | 
						|
	readDeadLineTime               time.Duration
 | 
						|
	reconnectInterval              time.Duration
 | 
						|
}
 | 
						|
 | 
						|
var dialer = &websocket.Dialer{
 | 
						|
	Proxy:             http.ProxyFromEnvironment,
 | 
						|
	HandshakeTimeout:  30 * time.Second,
 | 
						|
	EnableCompression: true,
 | 
						|
}
 | 
						|
 | 
						|
type WsConn struct {
 | 
						|
	c *websocket.Conn
 | 
						|
	WsConfig
 | 
						|
	writeBufferChan        chan []byte
 | 
						|
	pingMessageBufferChan  chan []byte
 | 
						|
	pongMessageBufferChan  chan []byte
 | 
						|
	closeMessageBufferChan chan []byte
 | 
						|
	subs                   [][]byte
 | 
						|
	close                  chan bool
 | 
						|
	reConnectLock          *sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
type WsBuilder struct {
 | 
						|
	wsConfig *WsConfig
 | 
						|
}
 | 
						|
 | 
						|
func NewWsBuilder() *WsBuilder {
 | 
						|
	return &WsBuilder{&WsConfig{
 | 
						|
		ReqHeaders:        make(map[string][]string, 1),
 | 
						|
		reconnectInterval: time.Second * 10,
 | 
						|
	}}
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) WsUrl(wsUrl string) *WsBuilder {
 | 
						|
	b.wsConfig.WsUrl = wsUrl
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) ProxyUrl(proxyUrl string) *WsBuilder {
 | 
						|
	b.wsConfig.ProxyUrl = proxyUrl
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) ReqHeader(key, value string) *WsBuilder {
 | 
						|
	b.wsConfig.ReqHeaders[key] = append(b.wsConfig.ReqHeaders[key], value)
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) AutoReconnect() *WsBuilder {
 | 
						|
	b.wsConfig.IsAutoReconnect = true
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) Dump() *WsBuilder {
 | 
						|
	b.wsConfig.IsDump = true
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) Heartbeat(heartbeat func() []byte, t time.Duration) *WsBuilder {
 | 
						|
	b.wsConfig.HeartbeatIntervalTime = t
 | 
						|
	b.wsConfig.HeartbeatData = heartbeat
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) ReconnectInterval(t time.Duration) *WsBuilder {
 | 
						|
	b.wsConfig.reconnectInterval = t
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) ProtoHandleFunc(f func([]byte) error) *WsBuilder {
 | 
						|
	b.wsConfig.ProtoHandleFunc = f
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) DecompressFunc(f func([]byte) ([]byte, error)) *WsBuilder {
 | 
						|
	b.wsConfig.DecompressFunc = f
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) ErrorHandleFunc(f func(err error)) *WsBuilder {
 | 
						|
	b.wsConfig.ErrorHandleFunc = f
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) ConnectSuccessAfterSendMessage(msg func() []byte) *WsBuilder {
 | 
						|
	b.wsConfig.ConnectSuccessAfterSendMessage = msg
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *WsBuilder) Build() *WsConn {
 | 
						|
	wsConn := &WsConn{WsConfig: *b.wsConfig}
 | 
						|
	return wsConn.NewWs()
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) NewWs() *WsConn {
 | 
						|
	if ws.HeartbeatIntervalTime == 0 {
 | 
						|
		ws.readDeadLineTime = time.Minute
 | 
						|
	} else {
 | 
						|
		ws.readDeadLineTime = ws.HeartbeatIntervalTime * 2
 | 
						|
	}
 | 
						|
 | 
						|
	if err := ws.connect(); err != nil {
 | 
						|
		log.Error("[" + ws.WsUrl + "] " + err.Error())
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	ws.close = make(chan bool, 1)
 | 
						|
	ws.pingMessageBufferChan = make(chan []byte, 10)
 | 
						|
	ws.pongMessageBufferChan = make(chan []byte, 10)
 | 
						|
	ws.closeMessageBufferChan = make(chan []byte, 10)
 | 
						|
	ws.writeBufferChan = make(chan []byte, 10)
 | 
						|
	ws.reConnectLock = new(sync.Mutex)
 | 
						|
 | 
						|
	go ws.writeRequest()
 | 
						|
	go ws.receiveMessage()
 | 
						|
 | 
						|
	//if ws.ConnectSuccessAfterSendMessage != nil {
 | 
						|
	//	msg := ws.ConnectSuccessAfterSendMessage()
 | 
						|
	//	if msg != nil{
 | 
						|
	//		ws.SendMessage(msg)
 | 
						|
	//		log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg))
 | 
						|
	//	}else {
 | 
						|
	//		log.ErrorLogMsg("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg))
 | 
						|
	//	}
 | 
						|
	//}
 | 
						|
 | 
						|
	return ws
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) connect() error {
 | 
						|
	const maxRetries = 5               // 最大重试次数
 | 
						|
	const retryDelay = 2 * time.Second // 每次重试的延迟时间
 | 
						|
 | 
						|
	var wsConn *websocket.Conn
 | 
						|
	var resp *http.Response
 | 
						|
	var err error
 | 
						|
 | 
						|
	// 重试机制
 | 
						|
	for attempt := 1; attempt <= maxRetries; attempt++ {
 | 
						|
		if ws.ProxyUrl != "" {
 | 
						|
			proxy, err := url.Parse(ws.ProxyUrl)
 | 
						|
			if err == nil {
 | 
						|
				// log.Info("[ws][%s] proxy url:%s", zap.String("ws.WsUrl", ws.WsUrl))
 | 
						|
				dialer.Proxy = http.ProxyURL(proxy)
 | 
						|
			} else {
 | 
						|
				log.Error("[ws][" + ws.WsUrl + "] parse proxy url [" + ws.ProxyUrl + "] err: " + err.Error())
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// 尝试连接
 | 
						|
		wsConn, resp, err = dialer.Dial(ws.WsUrl, http.Header(ws.ReqHeaders))
 | 
						|
		if err != nil {
 | 
						|
			log.Error(fmt.Sprintf("[ws][%s] Dial attempt %d failed: %s", ws.WsUrl, attempt, err.Error()))
 | 
						|
 | 
						|
			// 如果开启了请求数据转储,打印响应信息
 | 
						|
			if ws.IsDump && resp != nil {
 | 
						|
				dumpData, _ := httputil.DumpResponse(resp, true)
 | 
						|
				log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData)))
 | 
						|
			}
 | 
						|
 | 
						|
			// 达到最大重试次数,返回错误
 | 
						|
			if attempt == maxRetries {
 | 
						|
				return fmt.Errorf("达到最大重试次数 [ws][%s]: %v", ws.WsUrl, err)
 | 
						|
			}
 | 
						|
 | 
						|
			// 等待一段时间后重试
 | 
						|
			time.Sleep(retryDelay)
 | 
						|
		} else {
 | 
						|
			// 连接成功,退出循环
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// 设置读取超时时间
 | 
						|
	wsConn.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
 | 
						|
 | 
						|
	// 如果开启了请求数据转储,打印响应信息
 | 
						|
	if ws.IsDump && resp != nil {
 | 
						|
		dumpData, _ := httputil.DumpResponse(resp, true)
 | 
						|
		log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData)))
 | 
						|
	}
 | 
						|
 | 
						|
	// 记录连接成功的日志
 | 
						|
	log.Info("[ws][" + ws.WsUrl + "] connected")
 | 
						|
	ws.c = wsConn
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) reconnect() {
 | 
						|
	ws.reConnectLock.Lock()
 | 
						|
	defer ws.reConnectLock.Unlock()
 | 
						|
 | 
						|
	ws.c.Close() //主动关闭一次
 | 
						|
	var err error
 | 
						|
	for retry := 1; retry <= 100; retry++ {
 | 
						|
		err = ws.connect()
 | 
						|
		if err != nil {
 | 
						|
			log.Error("[ws] [" + ws.WsUrl + "] websocket reconnect fail , " + err.Error())
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		time.Sleep(ws.WsConfig.reconnectInterval * time.Duration(retry))
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		log.Error("[ws] [" + ws.WsUrl + "] retry connect 100 count fail , begin exiting. ")
 | 
						|
		ws.CloseWs()
 | 
						|
		if ws.ErrorHandleFunc != nil {
 | 
						|
			ws.ErrorHandleFunc(errors.New("retry reconnect fail"))
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		//re subscribe
 | 
						|
		if ws.ConnectSuccessAfterSendMessage != nil {
 | 
						|
			msg := ws.ConnectSuccessAfterSendMessage()
 | 
						|
			if msg != nil {
 | 
						|
				ws.SendMessage(msg)
 | 
						|
				//log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg))
 | 
						|
			} else {
 | 
						|
				log.Error("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg))
 | 
						|
			}
 | 
						|
			//ws.SendMessage(msg)
 | 
						|
			//log.InfoLog("[ws] [" + ws.WsUrl + "] execute the connect success after send message=" + string(msg))
 | 
						|
			time.Sleep(time.Second) //wait response
 | 
						|
		}
 | 
						|
 | 
						|
		for _, sub := range ws.subs {
 | 
						|
			log.Info("[ws] re subscribe: " + string(sub))
 | 
						|
			ws.SendMessage(sub)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) writeRequest() {
 | 
						|
	var (
 | 
						|
		heartTimer *time.Timer
 | 
						|
		err        error
 | 
						|
	)
 | 
						|
 | 
						|
	if ws.HeartbeatIntervalTime == 0 {
 | 
						|
		heartTimer = time.NewTimer(time.Hour)
 | 
						|
	} else {
 | 
						|
		heartTimer = time.NewTimer(ws.HeartbeatIntervalTime)
 | 
						|
	}
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ws.close:
 | 
						|
			log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting write message goroutine.")
 | 
						|
			return
 | 
						|
		case d := <-ws.writeBufferChan:
 | 
						|
			err = ws.c.WriteMessage(websocket.TextMessage, d)
 | 
						|
		case d := <-ws.pingMessageBufferChan:
 | 
						|
			err = ws.c.WriteMessage(websocket.PingMessage, d)
 | 
						|
		case d := <-ws.pongMessageBufferChan:
 | 
						|
			err = ws.c.WriteMessage(websocket.PongMessage, d)
 | 
						|
		case d := <-ws.closeMessageBufferChan:
 | 
						|
			err = ws.c.WriteMessage(websocket.CloseMessage, d)
 | 
						|
		case <-heartTimer.C:
 | 
						|
			if ws.HeartbeatIntervalTime > 0 {
 | 
						|
				err = ws.c.WriteMessage(websocket.TextMessage, ws.HeartbeatData())
 | 
						|
				heartTimer.Reset(ws.HeartbeatIntervalTime)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			log.Info("[ws][" + ws.WsUrl + "] write message " + err.Error())
 | 
						|
			//time.Sleep(time.Second)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) Subscribe(subEvent interface{}) error {
 | 
						|
	data, err := sonic.Marshal(subEvent)
 | 
						|
	if err != nil {
 | 
						|
		log.Error("[ws]["+ws.WsUrl+"] json encode error , ", zap.Error(err))
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	//ws.writeBufferChan <- data
 | 
						|
	ws.SendMessage(data)
 | 
						|
	ws.subs = append(ws.subs, data)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) SendMessage(msg []byte) {
 | 
						|
	defer func() {
 | 
						|
		//打印panic的错误信息
 | 
						|
		if err := recover(); err != nil { //产生了panic异常
 | 
						|
			fmt.Printf("SendMessage,panic: %s\r\n", err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	ws.writeBufferChan <- msg
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) SendPingMessage(msg []byte) {
 | 
						|
	ws.pingMessageBufferChan <- msg
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) SendPongMessage(msg []byte) {
 | 
						|
	ws.pongMessageBufferChan <- msg
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) SendCloseMessage(msg []byte) {
 | 
						|
	ws.closeMessageBufferChan <- msg
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) SendJsonMessage(m interface{}) error {
 | 
						|
	data, err := sonic.Marshal(m)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	//ws.writeBufferChan <- data
 | 
						|
	ws.SendMessage(data)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) receiveMessage() {
 | 
						|
	//exit
 | 
						|
	ws.c.SetCloseHandler(func(code int, text string) error {
 | 
						|
		log.Info("[ws][" + ws.WsUrl + "] websocket exiting [code=" + utility.IntTostring(code) + " , text=" + text + "]")
 | 
						|
		//ws.CloseWs()
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
 | 
						|
	ws.c.SetPongHandler(func(pong string) error {
 | 
						|
		// log.Info("[" + ws.WsUrl + "] received [pong] " + pong)
 | 
						|
		ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
 | 
						|
	ws.c.SetPingHandler(func(ping string) error {
 | 
						|
		// log.Info("[" + ws.WsUrl + "] received [ping] " + ping)
 | 
						|
		ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ws.close:
 | 
						|
			log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting receive message goroutine.")
 | 
						|
			return
 | 
						|
		default:
 | 
						|
			t, msg, err := ws.c.ReadMessage()
 | 
						|
			if err != nil {
 | 
						|
				log.Info("ws.c.ReadMessage[ws][" + ws.WsUrl + "] " + err.Error())
 | 
						|
				if ws.IsAutoReconnect {
 | 
						|
					log.Info("[ws][" + ws.WsUrl + "] Unexpected Closed , Begin Retry Connect.")
 | 
						|
					ws.reconnect()
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				if ws.ErrorHandleFunc != nil {
 | 
						|
					ws.ErrorHandleFunc(err)
 | 
						|
				}
 | 
						|
 | 
						|
				return
 | 
						|
			}
 | 
						|
			//			Log.Debug(string(msg))
 | 
						|
			ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
 | 
						|
			switch t {
 | 
						|
			case websocket.TextMessage:
 | 
						|
				ws.ProtoHandleFunc(msg)
 | 
						|
			case websocket.BinaryMessage:
 | 
						|
				if ws.DecompressFunc == nil {
 | 
						|
					ws.ProtoHandleFunc(msg)
 | 
						|
				} else {
 | 
						|
					msg2, err := ws.DecompressFunc(msg)
 | 
						|
					if err != nil {
 | 
						|
						log.Error("[ws] decompress error " + ws.WsUrl + err.Error())
 | 
						|
					} else {
 | 
						|
						ws.ProtoHandleFunc(msg2)
 | 
						|
					}
 | 
						|
				}
 | 
						|
				//	case websocket.CloseMessage:
 | 
						|
				//	ws.CloseWs()
 | 
						|
			default:
 | 
						|
				log.Error("[ws][" + ws.WsUrl + "] error websocket message type , content is " + string(msg))
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) CloseWs() {
 | 
						|
	defer func() {
 | 
						|
		//打印panic的错误信息
 | 
						|
		if err := recover(); err != nil { //产生了panic异常
 | 
						|
			fmt.Printf("CloseWs,panic: %s\r\n", err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	//ws.close <- true
 | 
						|
	close(ws.close)
 | 
						|
	close(ws.writeBufferChan)
 | 
						|
	close(ws.closeMessageBufferChan)
 | 
						|
	close(ws.pingMessageBufferChan)
 | 
						|
	close(ws.pongMessageBufferChan)
 | 
						|
 | 
						|
	err := ws.c.Close()
 | 
						|
	if err != nil {
 | 
						|
		log.Error("CloseWs[ws]["+ws.WsUrl+"] close websocket error ,", zap.Error(err))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (ws *WsConn) clearChannel(c chan struct{}) {
 | 
						|
	for {
 | 
						|
		if len(c) > 0 {
 | 
						|
			<-c
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |