package mq import ( amqp "github.com/rabbitmq/amqp091-go" ) // MQClient 是 RabbitMQ 的封装结构体 type MQClient struct { Conn *amqp.Connection Channel *amqp.Channel } var MQ *MQClient // 全局复用实例 // InitMQ 初始化 RabbitMQ 连接和通道 func InitMQ(rabbitURL string) (*MQClient, error) { if rabbitURL == "" { rabbitURL = "amqp://guest:guest@localhost:5672/" } conn, err := amqp.Dial(rabbitURL) if err != nil { return nil, err } ch, err := conn.Channel() if err != nil { return nil, err } MQ = &MQClient{Conn: conn, Channel: ch} return MQ, nil } // Close 关闭连接和通道 func (c *MQClient) Close() { if c.Channel != nil { _ = c.Channel.Close() } if c.Conn != nil { _ = c.Conn.Close() } } // Publish 发布消息到指定队列 func (c *MQClient) Publish(queue string, body []byte) error { return c.Channel.Publish( "", // exchange queue, // routing key false, false, amqp.Publishing{ ContentType: "application/json", Body: body, }, ) } // DeclareQueue 声明队列 func (c *MQClient) DeclareQueue(name string) (amqp.Queue, error) { return c.Channel.QueueDeclare( name, true, // durable false, // auto-delete false, // exclusive false, // no-wait nil, // arguments ) } // Consume 注册消费者 func (c *MQClient) Consume(queue string) (<-chan amqp.Delivery, error) { return c.Channel.Consume( queue, "", // consumer tag true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) }