Files
hucan 8ae43bfba9
Some checks failed
Build / build (push) Has been cancelled
CodeQL / Analyze (go) (push) Has been cancelled
build / Build (push) Has been cancelled
GitHub Actions Mirror / mirror_to_gitee (push) Has been cancelled
GitHub Actions Mirror / mirror_to_gitlab (push) Has been cancelled
Issue Close Require / issue-close-require (push) Has been cancelled
Issue Check Inactive / issue-check-inactive (push) Has been cancelled
1
2025-06-29 00:36:30 +08:00

83 lines
1.6 KiB
Go

package mq
import (
amqp "github.com/rabbitmq/amqp091-go"
)
// MQClient 是 RabbitMQ 的封装结构体
type MQClient struct {
Conn *amqp.Connection
Channel *amqp.Channel
}
var MQ *MQClient // 全局复用实例
// InitMQ 初始化 RabbitMQ 连接和通道
func InitMQ(rabbitURL string) (*MQClient, error) {
if rabbitURL == "" {
rabbitURL = "amqp://guest:guest@localhost:5672/"
}
conn, err := amqp.Dial(rabbitURL)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
MQ = &MQClient{Conn: conn, Channel: ch}
return MQ, nil
}
// Close 关闭连接和通道
func (c *MQClient) Close() {
if c.Channel != nil {
_ = c.Channel.Close()
}
if c.Conn != nil {
_ = c.Conn.Close()
}
}
// Publish 发布消息到指定队列
func (c *MQClient) Publish(queue string, body []byte) error {
return c.Channel.Publish(
"", // exchange
queue, // routing key
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
}
// DeclareQueue 声明队列
func (c *MQClient) DeclareQueue(name string) (amqp.Queue, error) {
return c.Channel.QueueDeclare(
name,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
}
// Consume 注册消费者
func (c *MQClient) Consume(queue string) (<-chan amqp.Delivery, error) {
return c.Channel.Consume(
queue,
"", // consumer tag
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
}