123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- package zapx
- import (
- amqp "github.com/rabbitmq/amqp091-go"
- )
- // 根据 RabbitMQ 的连接创建一个 Writer。
- func NewWriter(conn *amqp.Connection) (*MQWriter, error) {
- ch, err := conn.Channel()
- if err != nil {
- return nil, err
- }
- // 消息发送方和接收方都有义务创建队列
- // 所以不应该也不需要检查队列是否存在
- queue, err := ch.QueueDeclare(QueueLogRecord, true, false, false, false, nil)
- if err != nil {
- return nil, err
- }
- ws := &MQWriter{
- ch: ch,
- queue: queue,
- }
- return ws, nil
- }
- // 将日志分发送到 RabbitMQ。
- // 注意:发送的信息必须为 [api.LogRecord] 类型 json 序列化后的结果。
- type MQWriter struct {
- ch *amqp.Channel
- queue amqp.Queue
- }
- // 发送消息到 mq。消息处理是异步的,重试机制由 mq 实现。所以发送方并不会等待消息被处理。
- func (w *MQWriter) Write(bs []byte) (int, error) {
- p := amqp.Publishing{}
- WriteLogPublishBody(&p, bs)
- err := w.ch.Publish("", w.queue.Name, false, false, p)
- if err != nil {
- return 0, err
- }
- return len(bs), nil
- }
- // writer 本身不会缓存数据,因此不需要同步。
- func (w *MQWriter) Sync() error { return nil }
- // 关闭底层的连接。
- func (w *MQWriter) Close() error { return w.ch.Close() }
|