rabbitmq.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package zapx
  2. import (
  3. amqp "github.com/rabbitmq/amqp091-go"
  4. )
  5. // 根据 RabbitMQ 的连接创建一个 Writer。
  6. func NewWriter(conn *amqp.Connection) (*MQWriter, error) {
  7. ch, err := conn.Channel()
  8. if err != nil {
  9. return nil, err
  10. }
  11. // 消息发送方和接收方都有义务创建队列
  12. // 所以不应该也不需要检查队列是否存在
  13. queue, err := ch.QueueDeclare(QueueLogRecord, true, false, false, false, nil)
  14. if err != nil {
  15. return nil, err
  16. }
  17. ws := &MQWriter{
  18. ch: ch,
  19. queue: queue,
  20. }
  21. return ws, nil
  22. }
  23. // 将日志分发送到 RabbitMQ。
  24. // 注意:发送的信息必须为 [api.LogRecord] 类型 json 序列化后的结果。
  25. type MQWriter struct {
  26. ch *amqp.Channel
  27. queue amqp.Queue
  28. }
  29. // 发送消息到 mq。消息处理是异步的,重试机制由 mq 实现。所以发送方并不会等待消息被处理。
  30. func (w *MQWriter) Write(bs []byte) (int, error) {
  31. p := amqp.Publishing{}
  32. WriteLogPublishBody(&p, bs)
  33. err := w.ch.Publish("", w.queue.Name, false, false, p)
  34. if err != nil {
  35. return 0, err
  36. }
  37. return len(bs), nil
  38. }
  39. // writer 本身不会缓存数据,因此不需要同步。
  40. func (w *MQWriter) Sync() error { return nil }
  41. // 关闭底层的连接。
  42. func (w *MQWriter) Close() error { return w.ch.Close() }