rabbitmq.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package zapx
  2. import (
  3. "git.listensoft.net/tool/clickhouse-server/api/v1"
  4. amqp "github.com/rabbitmq/amqp091-go"
  5. )
  6. // 根据 RabbitMQ 的连接创建一个 Writer。
  7. func NewWriter(conn *amqp.Connection) (*MQWriter, error) {
  8. ch, err := conn.Channel()
  9. if err != nil {
  10. return nil, err
  11. }
  12. // 消息发送方和接收方都有义务创建队列
  13. // 所以不应该也不需要检查队列是否存在
  14. queue, err := ch.QueueDeclare(QueueLogRecord, true, false, false, false, nil)
  15. if err != nil {
  16. return nil, err
  17. }
  18. ws := &MQWriter{
  19. ch: ch,
  20. queue: queue,
  21. }
  22. return ws, nil
  23. }
  24. // 将日志分发送到 RabbitMQ。
  25. // 注意:发送的信息必须为 [api.LogRecord] 类型 json 序列化后的结果。
  26. type MQWriter struct {
  27. ch *amqp.Channel
  28. queue amqp.Queue
  29. }
  30. // 发送消息到 mq。消息处理是异步的,重试机制由 mq 实现。所以发送方并不会等待消息被处理。
  31. func (w *MQWriter) Write(bs []byte) (int, error) {
  32. p := amqp.Publishing{}
  33. api.WriteLogPublishBody(&p, bs)
  34. err := w.ch.Publish("", w.queue.Name, false, false, p)
  35. if err != nil {
  36. return 0, err
  37. }
  38. return len(bs), nil
  39. }
  40. // writer 本身不会缓存数据,因此不需要同步。
  41. func (w *MQWriter) Sync() error { return nil }
  42. // 关闭底层的连接。
  43. func (w *MQWriter) Close() error { return w.ch.Close() }