Bladeren bron

添加日志公共方法

zob 2 maanden geleden
commit
5c8f1673d4
11 gewijzigde bestanden met toevoegingen van 1021 en 0 verwijderingen
  1. 3 0
      .gitignore
  2. 5 0
      config/config.go
  3. 21 0
      config/logger.go
  4. 57 0
      examples/logger/main.go
  5. 28 0
      go.mod
  6. 45 0
      go.sum
  7. 145 0
      logger/logger.go
  8. 48 0
      logger/zapx/const.go
  9. 498 0
      logger/zapx/encoder.go
  10. 122 0
      logger/zapx/encoder_test.go
  11. 49 0
      logger/zapx/rabbitmq.go

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+logs/
+*.exe
+*.log

+ 5 - 0
config/config.go

@@ -0,0 +1,5 @@
+package config
+
+type System struct {
+	Logger Logger `yaml:"logger"`
+}

+ 21 - 0
config/logger.go

@@ -0,0 +1,21 @@
+package config
+
+// Config 日志配置结构
+type Logger struct {
+	Level      string    `yaml:"level"`      // 日志级别
+	Filename   string    `yaml:"filename"`   // 日志文件路径
+	MaxSize    int       `yaml:"maxSize"`    // 每个日志文件最大尺寸(MB)
+	MaxBackups int       `yaml:"maxBackups"` // 保留的旧文件最大数量
+	MaxAge     int       `yaml:"maxAge"`     // 保留的旧文件最大天数
+	Compress   bool      `yaml:"compress"`   // 是否压缩旧文件
+	Console    bool      `yaml:"console"`    // 是否同时输出到控制台
+	ToMq       bool      `yaml:"toMq"`       // 是否输出到MQ
+	MqSetting  MqSetting `yaml:"mqSetting"`  // MQ配置
+}
+
+type MqSetting struct {
+	Host     string `yaml:"host"`
+	Port     int    `yaml:"port"`
+	Username string `yaml:"username"`
+	Password string `yaml:"password"`
+}

+ 57 - 0
examples/logger/main.go

@@ -0,0 +1,57 @@
+package main
+
+import (
+	"fmt"
+
+	"git.listensoft.net/tool/jspkit/config"
+	"git.listensoft.net/tool/jspkit/logger"
+	"go.uber.org/zap"
+)
+
+func main() {
+	// 读取配置文件
+	// data, err := os.ReadFile("examples/config/logger.yaml")
+	// if err != nil {
+	// 	fmt.Printf("读取配置文件失败: %v\n", err)
+	// 	return
+	// }
+
+	// 解析配置
+	cfg := config.System{
+		Logger: config.Logger{
+			Level:      "debug",
+			Filename:   "log.log",
+			MaxSize:    10,
+			MaxBackups: 10,
+			MaxAge:     10,
+			Compress:   true,
+			Console:    true,
+			ToMq:       true,
+			MqSetting: config.MqSetting{
+				Host:     "localhost",
+				Port:     5672,
+				Username: "guest",
+				Password: "guest",
+			},
+		},
+	}
+	// if err := yaml.Unmarshal(data, &cfg); err != nil {
+	// 	fmt.Printf("解析配置文件失败: %v\n", err)
+	// 	return
+	// }
+
+	// 初始化日志
+	if err := logger.InitLogger(&cfg); err != nil {
+		fmt.Printf("初始化日志失败: %v\n", err)
+		return
+	}
+
+	// 使用示例
+	logger.Debug("这是一条调试日志")
+	logger.Info("这是一条信息日志")
+	logger.Warn("这是一条警告日志")
+	logger.Error("这是一条错误日志",
+		zap.String("error_code", "500"),
+		zap.String("error_msg", "服务器内部错误"),
+	)
+}

+ 28 - 0
go.mod

@@ -0,0 +1,28 @@
+module git.listensoft.net/tool/jspkit
+
+go 1.23.0
+
+toolchain go1.23.2
+
+require (
+	github.com/streadway/amqp v1.1.0
+	go.uber.org/zap v1.27.0
+	gopkg.in/natefinch/lumberjack.v2 v2.2.1
+	gopkg.in/yaml.v3 v3.0.1
+)
+
+require (
+	git.listensoft.net/tool/clickhouse-server v0.0.3 // indirect
+	git.listensoft.net/tool/clickhouse-server/pkg/zapx v0.0.0-20250110075158-2e9febf04b57 // indirect
+	github.com/go-kratos/aegis v0.2.0 // indirect
+	github.com/go-kratos/kratos/v2 v2.8.0 // indirect
+	github.com/go-playground/form/v4 v4.2.1 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/gorilla/mux v1.8.1 // indirect
+	github.com/rabbitmq/amqp091-go v1.10.0 // indirect
+	go.uber.org/multierr v1.10.0 // indirect
+	golang.org/x/sys v0.25.0 // indirect
+	google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
+	google.golang.org/grpc v1.66.0 // indirect
+	google.golang.org/protobuf v1.34.2 // indirect
+)

+ 45 - 0
go.sum

@@ -0,0 +1,45 @@
+git.listensoft.net/tool/clickhouse-server v0.0.3 h1:umPVIEYZC3EivSHdDfg/6fJq+DLAdAYr5DYQETGeR6U=
+git.listensoft.net/tool/clickhouse-server v0.0.3/go.mod h1:CuW3TmIwpF1J8fsR0qD0c2rMS+CQMrH85r8X3bwZcpA=
+git.listensoft.net/tool/clickhouse-server/pkg/zapx v0.0.0-20250110075158-2e9febf04b57 h1:BJeZlpSpg/mcM5T8tivQ7WIhCBq1BGy8bbZDvBQjV8g=
+git.listensoft.net/tool/clickhouse-server/pkg/zapx v0.0.0-20250110075158-2e9febf04b57/go.mod h1:UA/XdSKODT45WMQgakqaiItiuhPfYm/1mAIpP1H4y/U=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/go-kratos/aegis v0.2.0 h1:dObzCDWn3XVjUkgxyBp6ZeWtx/do0DPZ7LY3yNSJLUQ=
+github.com/go-kratos/aegis v0.2.0/go.mod h1:v0R2m73WgEEYB3XYu6aE2WcMwsZkJ/Rzuf5eVccm7bI=
+github.com/go-kratos/kratos/v2 v2.8.0 h1:qr27WRTRrI3o4jzJzNKf4XVVoMYIqnQD+4ws1C46yhM=
+github.com/go-kratos/kratos/v2 v2.8.0/go.mod h1:+Vfe3FzF0d+BfMdajA11jT0rAyJWublRE/seZQNZVxE=
+github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
+github.com/go-playground/form/v4 v4.2.1 h1:HjdRDKO0fftVMU5epjPW2SOREcZ6/wLUzEobqUGJuPw=
+github.com/go-playground/form/v4 v4.2.1/go.mod h1:q1a2BY+AQUUzhl6xA/6hBetay6dEIhMHjgvJiGo6K7U=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
+github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
+github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
+github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
+github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
+github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
+go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
+go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
+go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
+golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
+golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
+google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
+google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
+google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
+google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
+gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 145 - 0
logger/logger.go

@@ -0,0 +1,145 @@
+package logger
+
+import (
+	"fmt"
+	"os"
+	"sync"
+
+	"git.listensoft.net/tool/jspkit/config"
+	"git.listensoft.net/tool/jspkit/logger/zapx"
+	amqp "github.com/rabbitmq/amqp091-go"
+	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+	"gopkg.in/natefinch/lumberjack.v2"
+)
+
+var (
+	logger *zap.Logger
+	mqConn *amqp.Connection // 保存MQ连接
+	mu     sync.Mutex       // 用于保护mqConn
+)
+
+// InitLogger 初始化日志
+func InitLogger(config *config.System) error {
+	cfg := config.Logger
+	// 设置默认值
+	if cfg.MaxSize == 0 {
+		cfg.MaxSize = 100
+	}
+	if cfg.MaxBackups == 0 {
+		cfg.MaxBackups = 3
+	}
+	if cfg.MaxAge == 0 {
+		cfg.MaxAge = 28
+	}
+
+	// 配置lumberjack
+	hook := &lumberjack.Logger{
+		Filename:   cfg.Filename,
+		MaxSize:    cfg.MaxSize,
+		MaxBackups: cfg.MaxBackups,
+		MaxAge:     cfg.MaxAge,
+		Compress:   cfg.Compress,
+	}
+
+	// 设置日志级别
+	level := zap.InfoLevel
+	if cfg.Level != "" {
+		var err error
+		level, err = zapcore.ParseLevel(cfg.Level)
+		if err != nil {
+			return err
+		}
+	}
+
+	encoderConfig := zapcore.EncoderConfig{
+		TimeKey:        "time",
+		LevelKey:       "level",
+		NameKey:        "logger",
+		CallerKey:      "caller",
+		MessageKey:     "msg",
+		StacktraceKey:  "stacktrace",
+		LineEnding:     zapcore.DefaultLineEnding,
+		EncodeLevel:    zapcore.LowercaseLevelEncoder,
+		EncodeTime:     zapcore.ISO8601TimeEncoder,
+		EncodeDuration: zapcore.SecondsDurationEncoder,
+		EncodeCaller:   zapcore.ShortCallerEncoder,
+	}
+	cores := []zapcore.Core{}
+	if cfg.ToMq {
+		dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", cfg.MqSetting.Username, cfg.MqSetting.Password, cfg.MqSetting.Host, cfg.MqSetting.Port)
+		conn, err := amqp.Dial(dsn)
+		if err == nil {
+			mu.Lock()
+			mqConn = conn // 保存连接
+			mu.Unlock()
+
+			encoder := zapx.NewEncoder("taxrobot", zapx.CallerTrimmedPath)
+			wsc, _ := zapx.NewWriter(conn)
+
+			cores = append(cores, zapcore.NewCore(encoder, wsc, zap.InfoLevel))
+		}
+	}
+	// 创建Core
+	var basecore zapcore.Core
+	fileWriter := zapcore.AddSync(hook)
+	if cfg.Console {
+		// 同时输出到文件和控制台
+		consoleEncoder := zapcore.NewConsoleEncoder(encoderConfig)
+		fileEncoder := zapcore.NewJSONEncoder(encoderConfig)
+		basecore = zapcore.NewTee(
+			zapcore.NewCore(fileEncoder, fileWriter, level),
+			zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), level),
+		)
+	} else {
+		// 只输出到文件
+		fileEncoder := zapcore.NewJSONEncoder(encoderConfig)
+		basecore = zapcore.NewCore(fileEncoder, fileWriter, level)
+	}
+	cores = append(cores, basecore)
+	// 创建Logger
+	logger = zap.New(zapcore.NewTee(cores...), zap.AddCaller(), zap.AddCallerSkip(1))
+	return nil
+}
+
+// Debug 输出Debug级别日志
+func Debug(msg string, fields ...zap.Field) {
+	logger.Debug(msg, fields...)
+}
+
+// Info 输出Info级别日志
+func Info(msg string, fields ...zap.Field) {
+	logger.Info(msg, fields...)
+}
+
+// Warn 输出Warn级别日志
+func Warn(msg string, fields ...zap.Field) {
+	logger.Warn(msg, fields...)
+}
+
+// Error 输出Error级别日志
+func Error(msg string, fields ...zap.Field) {
+	logger.Error(msg, fields...)
+}
+
+// Fatal 输出Fatal级别日志
+func Fatal(msg string, fields ...zap.Field) {
+	logger.Fatal(msg, fields...)
+}
+
+// Close 关闭日志相关资源,包括MQ连接
+func Close() error {
+	if logger != nil {
+		logger.Sync()
+	}
+
+	mu.Lock()
+	defer mu.Unlock()
+
+	if mqConn != nil {
+		err := mqConn.Close()
+		mqConn = nil
+		return err
+	}
+	return nil
+}

+ 48 - 0
logger/zapx/const.go

@@ -0,0 +1,48 @@
+package zapx
+
+import "time"
+
+// 消息队列名称
+const (
+	QueueOperateRecord = "clickhouse_operation" // 操作记录队列
+	QueueLogRecord     = "clickhouse_log"       // 日志记录队列
+)
+
+// StringEntry 字符串键值对
+type StringEntry struct {
+	Key   string `json:"key"`   // 键
+	Value string `json:"value"` // 值
+}
+
+// FloatEntry 浮点数键值对
+type FloatEntry struct {
+	Key   string  `json:"key"`   // 键
+	Value float64 `json:"value"` // 值
+}
+
+// IntEntry 整数键值对
+type IntEntry struct {
+	Key   string `json:"key"`   // 键
+	Value int32  `json:"value"` // 值
+}
+
+// BoolEntry 布尔键值对
+type BoolEntry struct {
+	Key   string `json:"key"`   // 键
+	Value bool   `json:"value"` // 值
+}
+
+// LogRecord 定义结构化日志记录的数据结构。
+// 因为结构化日志允许重复的键值对,所以使用切片类型模拟字典。
+// 不支持嵌套类型。嵌套类型可使用 json path 做 key。如 "a.b[1].c"。
+type LogRecord struct {
+	EventTime     time.Time     `json:"eventTime"`     // 日志时间。必填。
+	Category      string        `json:"category"`      // 工程分类。必填。
+	Level         string        `json:"level"`         // 日志等级
+	Caller        string        `json:"caller"`        // 日志调用位置
+	Message       string        `json:"message"`       // 日志消息
+	StringEntries []StringEntry `json:"stringEntries"` // 字符串键值对
+	FloatEntries  []FloatEntry  `json:"floatEntries"`  // 浮点数键值对
+	IntEntries    []IntEntry    `json:"intEntries"`    // 整数键值对
+	BoolEntries   []BoolEntry   `json:"boolEntries"`   // 布尔键值对
+}

+ 498 - 0
logger/zapx/encoder.go

@@ -0,0 +1,498 @@
+// Package zapx 提供了 zap 日志的对接
+package zapx
+
+import (
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"time"
+
+	"go.uber.org/zap/buffer"
+	"go.uber.org/zap/zapcore"
+)
+
+var pool = buffer.NewPool()
+
+// 创建新的 encoder 对象
+func NewEncoder(category string, formatter CallerFormatFunc) zapcore.Encoder {
+	return &encoder{
+		key:              "",
+		category:         category,
+		formatCallerPath: formatter,
+		entries: &recordEntries{
+			stringEntries: []StringEntry{},
+			floatEntries:  []FloatEntry{},
+			intEntries:    []IntEntry{},
+			boolEntries:   []BoolEntry{},
+		},
+	}
+}
+
+// 调用方路径格式化函数类型
+type CallerFormatFunc = func(zapcore.EntryCaller) string
+
+// 格式化为全路径
+func CallerFullPath(c zapcore.EntryCaller) string { return c.FullPath() }
+
+// 格式化为相对路径
+func CallerTrimmedPath(c zapcore.EntryCaller) string { return c.TrimmedPath() }
+
+// 日志记录的参数
+type recordEntries struct {
+	stringEntries []StringEntry
+	floatEntries  []FloatEntry
+	intEntries    []IntEntry
+	boolEntries   []BoolEntry
+}
+
+// 因为 json number 都是 double 型数字,最高有效位只有 53 位。所以为了避免精度丢失,需要对 int64 类型数据做以下处理:
+//   - 能被 int32 表示的数据当作 int 数字处理。
+//   - 在 double 类型安全范围内的数字当作 float64 数字处理。
+//   - 无法用 double 表示的数字当作 string 处理。
+func addInt64(r *recordEntries, key string, value int64) {
+	switch {
+	case int64(int32(value)) == value:
+		r.intEntries = append(r.intEntries, IntEntry{Key: key, Value: int32(value)})
+	case int64(float64(value)) == value:
+		r.floatEntries = append(r.floatEntries, FloatEntry{Key: key, Value: float64(value)})
+	default:
+		r.stringEntries = append(r.stringEntries, StringEntry{Key: key, Value: strconv.FormatInt(value, 10)})
+	}
+}
+
+// uint 情况同上,转换规则为:
+//   - 能被 int32 表示的数据当作 int 数字处理。
+//   - 在 double 类型安全范围内的数字当作 float64 数字处理。
+//   - 无法用 double 表示的数字当作 string 处理。
+func addUint64(r *recordEntries, key string, value uint64) {
+	switch {
+	case uint64(int32(value)) == value:
+		r.intEntries = append(r.intEntries, IntEntry{Key: key, Value: int32(value)})
+	case uint64(float64(value)) == value:
+		r.floatEntries = append(r.floatEntries, FloatEntry{Key: key, Value: float64(value)})
+	default:
+		r.stringEntries = append(r.stringEntries, StringEntry{Key: key, Value: strconv.FormatUint(value, 10)})
+	}
+}
+
+type encoder struct {
+	key              string
+	category         string
+	formatCallerPath func(zapcore.EntryCaller) string
+	entries          *recordEntries
+}
+
+func (e *encoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) {
+	var caller string
+	if entry.Caller != (zapcore.EntryCaller{}) {
+		caller = e.formatCallerPath(entry.Caller)
+	}
+
+	r := LogRecord{
+		EventTime:     entry.Time,
+		Category:      e.category,
+		Level:         entry.Level.String(),
+		Caller:        caller,
+		Message:       entry.Message,
+		StringEntries: e.entries.stringEntries,
+		FloatEntries:  e.entries.floatEntries,
+		IntEntries:    e.entries.intEntries,
+		BoolEntries:   e.entries.boolEntries,
+	}
+	enc := &objectEncoder{
+		key: e.key,
+		entries: &recordEntries{
+			stringEntries: []StringEntry{},
+			floatEntries:  []FloatEntry{},
+			intEntries:    []IntEntry{},
+			boolEntries:   []BoolEntry{},
+		},
+	}
+	for _, field := range fields {
+		field.AddTo(enc)
+	}
+	r.StringEntries = append(r.StringEntries, enc.entries.stringEntries...)
+	r.FloatEntries = append(r.FloatEntries, enc.entries.floatEntries...)
+	r.IntEntries = append(r.IntEntries, enc.entries.intEntries...)
+	r.BoolEntries = append(r.BoolEntries, enc.entries.boolEntries...)
+
+	bs, err := json.Marshal(r)
+	if err != nil {
+		return nil, err
+	}
+	buf := pool.Get()
+	_, err = buf.Write(bs)
+	if err != nil {
+		return nil, err
+	}
+	return buf, nil
+}
+func (e *encoder) AddArray(key string, marshaler zapcore.ArrayMarshaler) error {
+	return marshaler.MarshalLogArray(&sliceEncoder{key: addKey(e.key, key)})
+}
+
+func (e *encoder) AddObject(key string, marshaler zapcore.ObjectMarshaler) error {
+	return marshaler.MarshalLogObject(&objectEncoder{key: addKey(e.key, key)})
+}
+
+// 不支持纯二进制类型,用 Base64 编码一下。
+func (e *encoder) AddBinary(key string, value []byte) {
+	v := base64.StdEncoding.EncodeToString(value)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+}
+
+func (e *encoder) AddByteString(key string, value []byte) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: string(value)})
+}
+
+func (e *encoder) AddBool(key string, value bool) {
+	e.entries.boolEntries = append(e.entries.boolEntries, BoolEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *encoder) AddComplex128(key string, value complex128) {
+	v := strconv.FormatComplex(value, 'f', -1, 128)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+}
+
+func (e *encoder) AddComplex64(key string, value complex64) {
+	v := strconv.FormatComplex(complex128(value), 'f', -1, 64)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+}
+
+func (e *encoder) AddDuration(key string, value time.Duration) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: value.String()})
+}
+
+func (e *encoder) AddFloat64(key string, value float64) {
+	e.entries.floatEntries = append(e.entries.floatEntries, FloatEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *encoder) AddFloat32(key string, value float32) {
+	e.entries.floatEntries = append(e.entries.floatEntries, FloatEntry{Key: addKey(e.key, key), Value: float64(value)})
+}
+
+func (e *encoder) AddInt(key string, value int) {
+	addInt64(e.entries, addKey(e.key, key), int64(value))
+}
+
+func (e *encoder) AddInt64(key string, value int64) {
+	addInt64(e.entries, addKey(e.key, key), value)
+}
+
+func (e *encoder) AddInt32(key string, value int32) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *encoder) AddInt16(key string, value int16) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *encoder) AddInt8(key string, value int8) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *encoder) AddString(key string, value string) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *encoder) AddTime(key string, value time.Time) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: value.Format(time.RFC3339Nano)})
+}
+
+func (e *encoder) AddUint(key string, value uint) {
+	addUint64(e.entries, addKey(e.key, key), uint64(value))
+}
+
+func (e *encoder) AddUint64(key string, value uint64) {
+	addUint64(e.entries, addKey(e.key, key), value)
+}
+
+func (e *encoder) AddUint32(key string, value uint32) {
+	addUint64(e.entries, addKey(e.key, key), uint64(value))
+}
+
+func (e *encoder) AddUint16(key string, value uint16) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *encoder) AddUint8(key string, value uint8) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *encoder) AddUintptr(key string, value uintptr) {
+	addUint64(e.entries, addKey(e.key, key), uint64(value))
+}
+
+func (e *encoder) AddReflected(key string, value any) error {
+	v := fmt.Sprint(value)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+	return nil
+}
+
+func (e *encoder) OpenNamespace(key string) {
+	e.key = addKey(e.key, key)
+}
+
+func (e *encoder) Clone() zapcore.Encoder {
+	return &encoder{
+		key:              e.key,
+		category:         e.category,
+		formatCallerPath: e.formatCallerPath,
+		entries: &recordEntries{
+			stringEntries: e.entries.stringEntries,
+			floatEntries:  e.entries.floatEntries,
+			intEntries:    e.entries.intEntries,
+			boolEntries:   e.entries.boolEntries,
+		},
+	}
+}
+
+func addKey(parent, child string) string {
+	if parent == "" {
+		return child
+	}
+	return parent + "." + child
+}
+
+func addIndex(parent string, index int) string {
+	return fmt.Sprintf("%s[%d]", parent, index)
+}
+
+type objectEncoder struct {
+	key     string
+	entries *recordEntries
+}
+
+func (e *objectEncoder) AddArray(key string, marshaler zapcore.ArrayMarshaler) error {
+	return marshaler.MarshalLogArray(&sliceEncoder{key: addKey(e.key, key), entries: e.entries})
+}
+
+func (e *objectEncoder) AddObject(key string, marshaler zapcore.ObjectMarshaler) error {
+	return marshaler.MarshalLogObject(&objectEncoder{key: addKey(e.key, key), entries: e.entries})
+}
+
+// 不支持纯二进制类型,用 Base64 编码一下。
+func (e *objectEncoder) AddBinary(key string, value []byte) {
+	v := base64.StdEncoding.EncodeToString(value)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+}
+
+func (e *objectEncoder) AddByteString(key string, value []byte) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: string(value)})
+}
+
+func (e *objectEncoder) AddBool(key string, value bool) {
+	e.entries.boolEntries = append(e.entries.boolEntries, BoolEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *objectEncoder) AddComplex128(key string, value complex128) {
+	v := strconv.FormatComplex(value, 'f', -1, 128)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+}
+
+func (e *objectEncoder) AddComplex64(key string, value complex64) {
+	v := strconv.FormatComplex(complex128(value), 'f', -1, 64)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+}
+
+func (e *objectEncoder) AddDuration(key string, value time.Duration) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: value.String()})
+}
+
+func (e *objectEncoder) AddFloat64(key string, value float64) {
+	e.entries.floatEntries = append(e.entries.floatEntries, FloatEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *objectEncoder) AddFloat32(key string, value float32) {
+	e.entries.floatEntries = append(e.entries.floatEntries, FloatEntry{Key: addKey(e.key, key), Value: float64(value)})
+}
+
+func (e *objectEncoder) AddInt(key string, value int) {
+	addInt64(e.entries, addKey(e.key, key), int64(value))
+}
+
+func (e *objectEncoder) AddInt64(key string, value int64) {
+	addInt64(e.entries, addKey(e.key, key), value)
+}
+
+func (e *objectEncoder) AddInt32(key string, value int32) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *objectEncoder) AddInt16(key string, value int16) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *objectEncoder) AddInt8(key string, value int8) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *objectEncoder) AddString(key string, value string) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: value})
+}
+
+func (e *objectEncoder) AddTime(key string, value time.Time) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: value.Format(time.RFC3339Nano)})
+}
+
+func (e *objectEncoder) AddUint(key string, value uint) {
+	addUint64(e.entries, addKey(e.key, key), uint64(value))
+}
+
+func (e *objectEncoder) AddUint64(key string, value uint64) {
+	addUint64(e.entries, addKey(e.key, key), value)
+}
+
+func (e *objectEncoder) AddUint32(key string, value uint32) {
+	addUint64(e.entries, addKey(e.key, key), uint64(value))
+}
+
+func (e *objectEncoder) AddUint16(key string, value uint16) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *objectEncoder) AddUint8(key string, value uint8) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addKey(e.key, key), Value: int32(value)})
+}
+
+func (e *objectEncoder) AddUintptr(key string, value uintptr) {
+	addUint64(e.entries, addKey(e.key, key), uint64(value))
+}
+
+func (e *objectEncoder) AddReflected(key string, value any) error {
+	v := fmt.Sprint(value)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addKey(e.key, key), Value: v})
+	return nil
+}
+
+func (e *objectEncoder) OpenNamespace(key string) {
+	e.key = addKey(e.key, key)
+}
+
+type sliceEncoder struct {
+	key     string
+	index   int
+	entries *recordEntries
+}
+
+func (e *sliceEncoder) AppendBool(value bool) {
+	e.entries.boolEntries = append(e.entries.boolEntries, BoolEntry{Key: addIndex(e.key, e.index), Value: value})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendByteString(value []byte) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: string(value)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendComplex128(value complex128) {
+	v := strconv.FormatComplex(value, 'f', -1, 128)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: v})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendComplex64(value complex64) {
+	v := strconv.FormatComplex(complex128(value), 'f', -1, 64)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: v})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendFloat64(value float64) {
+	e.entries.floatEntries = append(e.entries.floatEntries, FloatEntry{Key: addIndex(e.key, e.index), Value: value})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendFloat32(value float32) {
+	e.entries.floatEntries = append(e.entries.floatEntries, FloatEntry{Key: addIndex(e.key, e.index), Value: float64(value)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendInt(value int) {
+	addInt64(e.entries, addIndex(e.key, e.index), int64(value))
+	e.index++
+}
+
+func (e *sliceEncoder) AppendInt64(value int64) {
+	addInt64(e.entries, addIndex(e.key, e.index), value)
+	e.index++
+}
+
+func (e *sliceEncoder) AppendInt32(value int32) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addIndex(e.key, e.index), Value: value})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendInt16(value int16) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addIndex(e.key, e.index), Value: int32(value)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendInt8(value int8) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addIndex(e.key, e.index), Value: int32(value)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendString(value string) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: value})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendUint(value uint) {
+	addUint64(e.entries, addIndex(e.key, e.index), uint64(value))
+	e.index++
+}
+
+func (e *sliceEncoder) AppendUint64(value uint64) {
+	addUint64(e.entries, addIndex(e.key, e.index), value)
+	e.index++
+}
+
+func (e *sliceEncoder) AppendUint32(value uint32) {
+	addUint64(e.entries, addIndex(e.key, e.index), uint64(value))
+	e.index++
+}
+
+func (e *sliceEncoder) AppendUint16(value uint16) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addIndex(e.key, e.index), Value: int32(value)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendUint8(value uint8) {
+	e.entries.intEntries = append(e.entries.intEntries, IntEntry{Key: addIndex(e.key, e.index), Value: int32(value)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendUintptr(value uintptr) {
+	addUint64(e.entries, addIndex(e.key, e.index), uint64(value))
+	e.index++
+}
+
+func (e *sliceEncoder) AppendDuration(value time.Duration) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: value.String()})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendTime(value time.Time) {
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: value.Format(time.RFC3339Nano)})
+	e.index++
+}
+
+func (e *sliceEncoder) AppendArray(value zapcore.ArrayMarshaler) error {
+	err := value.MarshalLogArray(&sliceEncoder{key: addIndex(e.key, e.index), entries: e.entries})
+	e.index++
+	return err
+}
+
+func (e *sliceEncoder) AppendObject(value zapcore.ObjectMarshaler) error {
+	err := value.MarshalLogObject(&objectEncoder{key: addIndex(e.key, e.index), entries: e.entries})
+	e.index++
+	return err
+}
+
+func (e *sliceEncoder) AppendReflected(value any) error {
+	v := fmt.Sprint(value)
+	e.entries.stringEntries = append(e.entries.stringEntries, StringEntry{Key: addIndex(e.key, e.index), Value: v})
+	e.index++
+	return nil
+}

+ 122 - 0
logger/zapx/encoder_test.go

@@ -0,0 +1,122 @@
+package zapx
+
+import (
+	"bytes"
+	"testing"
+	"time"
+
+	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+)
+
+var (
+	eventTime = time.Date(2006, 1, 2, 15, 4, 5, 0, time.UTC)
+	caller    = func(zapcore.EntryCaller) string { return "" }
+)
+
+func TestEncodeNestedObject(t *testing.T) {
+	entry := zapcore.Entry{
+		Level:      zapcore.InfoLevel,
+		Time:       eventTime,
+		LoggerName: "",
+		Message:    "msg",
+		Caller:     zapcore.EntryCaller{},
+		Stack:      "",
+	}
+	fields := []zapcore.Field{
+		zap.Object("a", zapcore.ObjectMarshalerFunc(func(oe zapcore.ObjectEncoder) error {
+			oe.AddString("a1", "v1")
+			oe.AddArray("b", zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
+				ae.AppendString("b1")
+				ae.AppendObject(zapcore.ObjectMarshalerFunc(func(oe zapcore.ObjectEncoder) error {
+					oe.AddString("c1", "v2")
+					return nil
+				}))
+				return nil
+			}))
+			return nil
+		})),
+	}
+
+	encoder := NewEncoder("category", caller)
+	buf, err := encoder.EncodeEntry(entry, fields)
+	if err != nil {
+		t.Fatal(err)
+	}
+	s := buf.String()
+	want := `{"eventTime":"2006-01-02T15:04:05Z","category":"category","level":"info","caller":"","message":"msg","stringEntries":[{"key":"a.a1","value":"v1"},{"key":"a.b[0]","value":"b1"},{"key":"a.b[1].c1","value":"v2"}],"floatEntries":[],"intEntries":[],"boolEntries":[]}`
+	if s != want {
+		t.Errorf("log mismatch\n got %s\nwant %s", s, want)
+	}
+}
+
+func TestEncodeInt(t *testing.T) {
+	entry := zapcore.Entry{
+		Level:      zapcore.InfoLevel,
+		Time:       eventTime,
+		LoggerName: "",
+		Message:    "msg",
+		Caller:     zapcore.EntryCaller{},
+		Stack:      "",
+	}
+	fields := []zapcore.Field{
+		zap.Int64("a1", 1),
+		zap.Int64("b1", 2147483648),       // 1 << 33 = max int32 + 1
+		zap.Int64("c1", 9007199254740993), // 1<<53+1
+		zap.Uint64("a2", 1),
+		zap.Uint64("b2", 2147483648),       // 1 << 33 = max int32 + 1
+		zap.Uint64("c2", 9007199254740993), // 1<<53+1
+	}
+
+	encoder := NewEncoder("category", caller)
+	buf, err := encoder.EncodeEntry(entry, fields)
+	if err != nil {
+		t.Fatal(err)
+	}
+	s := buf.String()
+	want := `{"eventTime":"2006-01-02T15:04:05Z","category":"category","level":"info","caller":"","message":"msg","stringEntries":[{"key":"c1","value":"9007199254740993"},{"key":"c2","value":"9007199254740993"}],"floatEntries":[{"key":"b1","value":2147483648},{"key":"b2","value":2147483648}],"intEntries":[{"key":"a1","value":1},{"key":"a2","value":1}],"boolEntries":[]}`
+	if s != want {
+		t.Errorf("log mismatch\n got %s\nwant %s", s, want)
+	}
+}
+
+func TestWith(t *testing.T) {
+	var buf bytes.Buffer
+	var clock mockClock
+	encoder := NewEncoder("category", caller)
+
+	core := zapcore.NewCore(encoder, zapcore.AddSync(&buf), zapcore.InfoLevel)
+
+	logger := zap.New(core, zap.WithClock(clock))
+	logger = logger.With(zap.String("a", "b"))
+	logger.Info("hello", zap.String("name", "world"))
+
+	want := `{"eventTime":"2006-01-02T15:04:05Z","category":"category","level":"info","caller":"","message":"hello","stringEntries":[{"key":"a","value":"b"},{"key":"name","value":"world"}],"floatEntries":[],"intEntries":[],"boolEntries":[]}`
+	s := buf.String()
+	if s != want {
+		t.Errorf("log mismatch\n got %s\nwant %s", s, want)
+	}
+}
+
+func TestNameSpace(t *testing.T) {
+	var buf bytes.Buffer
+	var clock mockClock
+	encoder := NewEncoder("category", caller)
+
+	core := zapcore.NewCore(encoder, zapcore.AddSync(&buf), zapcore.InfoLevel)
+
+	logger := zap.New(core, zap.WithClock(clock))
+	logger = logger.With(zap.Namespace("a"), zap.String("b", "c"))
+	logger.Info("msg", zap.String("d", "e"))
+
+	want := `{"eventTime":"2006-01-02T15:04:05Z","category":"category","level":"info","caller":"","message":"msg","stringEntries":[{"key":"a.b","value":"c"},{"key":"a.d","value":"e"}],"floatEntries":[],"intEntries":[],"boolEntries":[]}`
+	s := buf.String()
+	if s != want {
+		t.Errorf("log mismatch\n got %s\nwant %s", s, want)
+	}
+}
+
+type mockClock struct{}
+
+func (c mockClock) Now() time.Time                       { return eventTime }
+func (c mockClock) NewTicker(time.Duration) *time.Ticker { panic("not implemented") }

+ 49 - 0
logger/zapx/rabbitmq.go

@@ -0,0 +1,49 @@
+package zapx
+
+import (
+	"git.listensoft.net/tool/clickhouse-server/api/v1"
+	amqp "github.com/rabbitmq/amqp091-go"
+)
+
+// 根据 RabbitMQ 的连接创建一个 Writer。
+func NewWriter(conn *amqp.Connection) (*MQWriter, error) {
+	ch, err := conn.Channel()
+	if err != nil {
+		return nil, err
+	}
+	// 消息发送方和接收方都有义务创建队列
+	// 所以不应该也不需要检查队列是否存在
+	queue, err := ch.QueueDeclare(QueueLogRecord, true, false, false, false, nil)
+	if err != nil {
+		return nil, err
+	}
+	ws := &MQWriter{
+		ch:    ch,
+		queue: queue,
+	}
+	return ws, nil
+}
+
+// 将日志分发送到 RabbitMQ。
+// 注意:发送的信息必须为 [api.LogRecord] 类型 json 序列化后的结果。
+type MQWriter struct {
+	ch    *amqp.Channel
+	queue amqp.Queue
+}
+
+// 发送消息到 mq。消息处理是异步的,重试机制由 mq 实现。所以发送方并不会等待消息被处理。
+func (w *MQWriter) Write(bs []byte) (int, error) {
+	p := amqp.Publishing{}
+	api.WriteLogPublishBody(&p, bs)
+	err := w.ch.Publish("", w.queue.Name, false, false, p)
+	if err != nil {
+		return 0, err
+	}
+	return len(bs), nil
+}
+
+// writer 本身不会缓存数据,因此不需要同步。
+func (w *MQWriter) Sync() error { return nil }
+
+// 关闭底层的连接。
+func (w *MQWriter) Close() error { return w.ch.Close() }