package zapx import ( amqp "github.com/rabbitmq/amqp091-go" ) // 根据 RabbitMQ 的连接创建一个 Writer。 func NewWriter(conn *amqp.Connection) (*MQWriter, error) { ch, err := conn.Channel() if err != nil { return nil, err } // 消息发送方和接收方都有义务创建队列 // 所以不应该也不需要检查队列是否存在 queue, err := ch.QueueDeclare(QueueLogRecord, true, false, false, false, nil) if err != nil { return nil, err } ws := &MQWriter{ ch: ch, queue: queue, } return ws, nil } // 将日志分发送到 RabbitMQ。 // 注意:发送的信息必须为 [api.LogRecord] 类型 json 序列化后的结果。 type MQWriter struct { ch *amqp.Channel queue amqp.Queue } // 发送消息到 mq。消息处理是异步的,重试机制由 mq 实现。所以发送方并不会等待消息被处理。 func (w *MQWriter) Write(bs []byte) (int, error) { p := amqp.Publishing{} WriteLogPublishBody(&p, bs) err := w.ch.Publish("", w.queue.Name, false, false, p) if err != nil { return 0, err } return len(bs), nil } // writer 本身不会缓存数据,因此不需要同步。 func (w *MQWriter) Sync() error { return nil } // 关闭底层的连接。 func (w *MQWriter) Close() error { return w.ch.Close() }