一.背景
经过这段时间以来的各项针对etcd defrag和snapshot的测试情况发现,数据量较大的情况下(3.8G),etcd在defrag和snapshot的时候,对应节点会存在超时情况,且defrag时该节点处于不可用状态。为了搞清楚为什么节点会出现这个情况,这里需要深入源码,学习了解其背后的逻辑。
二. defrag代码
defrag核心代码逻辑在etcd项目中server/storage/backend/backend.go文件中,主要是两个函数:defrag() 和 defragdb(odb, tmpdb *bolt.DB, limit int) error
1.defrag()函数
func (b *backend) defrag() error {
// 启动计时器以测量执行时间
now := time.Now()
// 设置一个标志以指示碎片整理正在进行中,并在方法完成时延迟将标志设置回0
isDefragActive.Set(1)
defer isDefragActive.Set(0)
// 锁定batchTx以确保没有人在使用之前的事务,并关闭正在进行的事务。
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
// 锁定数据库以避免死锁。
b.mu.Lock()
defer b.mu.Unlock()
// 锁定并发读取请求,同时重置事务。
b.readTx.Lock()
defer b.readTx.Unlock()
// 提交当前事务并将其设置为nil。
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
// 在与数据库相同的目录中创建临时文件。
dir := filepath.Dir(b.db.Path())
temp, err := os.CreateTemp(dir, "db.tmp.*")
if err != nil {
return err
}
// 省略部分代码
... ...
// 使用BoltDB库打开临时数据库
tmpdb, err := bolt.Open(tdbp, 0600, &options)
if err != nil {
return err
}
// 记录数据库的当前大小和使用大小
dbp := b.db.Path()
size1, sizeInUse1 := b.Size(), b.SizeInUse()
if b.lg != nil {
b.lg.Info(
"defragmenting",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes", size1),
zap.String("current-db-size", humanize.Bytes(uint64(size1))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)
}
// 调用defragdb函数,通过将内容复制到临时数据库来进行数据库碎片整理
err = defragdb(b.db, tmpdb, defragLimit)
if err != nil {
tmpdb.Close()
// 如果碎片整理失败,则清理临时数据库并返回错误
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
}
return err
}
// 关闭原始数据库
err = b.db.Close()
if err != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
}
// 关闭临时数据库
err = tmpdb.Close()
if err != nil {
b.lg.Fatal("failed to close tmp database", zap.Error(err))
}
// 将临时数据库重命名为替换原始数据库。
err = os.Rename(tdbp, dbp)
if err != nil {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}
// 打开重命名后的数据库
b.db, err = bolt.Open(dbp, 0600, b.bopts)
if err != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
}
// 开始一个新的事务并将其分配给batchTx。
b.batchTx.tx = b.unsafeBegin(true)
// 重置读取事务并将其分配给readTx。
b.readTx.reset()
b.readTx.tx = b.unsafeBegin(false)
// 计算数据库的新大小和使用大小。
size := b.readTx.tx.Size()
db := b.readTx.tx.DB()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
// 记录执行时间
took := time.Since(now)
defragSec.Observe(took.Seconds())
// 记录数据库的最终大小和使用大小
size2, sizeInUse2 := b.Size(), b.SizeInUse()
if b.lg != nil {
b.lg.Info(
"finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
zap.String("current-db-size", humanize.Bytes(uint64(size2))),
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", took),
)
}
return nil
}
2. defragdb(odb, tmpdb *bolt.DB, limit int) error函数
// 该函数的入参是原来的数据库和新建的临时数据库,limit参数是单个事务中可以复制的最大数量(下面代码中会解释limit)
func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// 在 tmpdb 临时数据库上打开一个写事务
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
tmptx.Rollback()
}
}()
// 在 odb 原来的数据库上打开一个读事务,
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()
// 创建与根桶相关联的游标
c := tx.Cursor()
// 遍历 odb 数据库中的每个桶,并将键值对复制到 tmpdb 数据库中相应的桶中。
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
if berr != nil {
return berr
}
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
// limit 参数确定在单个事务中可以复制的键值对的最大数量。如果键值对的数量超过了 limit,则在
// tmpdb 数据库上启动一个新的写事务,并继续该过程,直到所有的键值对都已复制
if err = b.ForEach(func(k, v []byte) error {
count++
if count > limit {
err = tmptx.Commit()
if err != nil {
return err
}
tmptx, err = tmpdb.Begin(true)
if err != nil {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
count = 0
}
// 将键值对复制到 tmpdb 数据库中相应的桶中
return tmpb.Put(k, v)
}); err != nil {
return err
}
}
// 提交tmpdb 数据库上的写事务, 完成复制
return tmptx.Commit()
}
三. snapshot 代码
snapshot核心代码逻辑在etcd项目中server/etcdserver/api/maintenance.go文件和server/storage/backend/backend.go文件中,主要是三个函数:maintenanceServer.Snapshot函数,backend.Snapshot()函数和一个数据库开启读事务的db.begin函数。
1.maintenanceServer.Snapshot函数:
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
// 读取存储版本信息,并将其存储在storageVersion变量中
ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx())
storageVersion := ""
if ver != nil {
storageVersion = ver.String()
}
// 创建一个数据库的快照,并将快照结果存储在snap变量中
snap := ms.bg.Backend().Snapshot()
// 创建了一个io.Pipe对象,并将其分为读端pr和写端pw。
// 这将用于将快照数据写入管道,并从管道中读取数据发送给客户端。
pr, pw := io.Pipe()
defer pr.Close()
// 启动一个goroutine,其中调用snap.WriteTo(pw)方法将快照数据写入管道,并在写入完成后关闭pipe写端。
go func() {
snap.WriteTo(pw)
if err := snap.Close(); err != nil {
ms.lg.Warn("failed to close snapshot", zap.Error(err))
}
pw.Close()
}()
// 用于计算快照数据的SHA256校验和。
h := sha256.New()
// 初始化发送数据量
sent := int64(0)
// 获取快照数据的总大小
total := snap.Size()
// 格式化为人类可读的格式
size := humanize.Bytes(uint64(total))
// 记录发送快照数据的开始时间,打印快照的总字节数、大小、存储版本等信息
start := time.Now()
ms.lg.Info("sending database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("storage-version", storageVersion),
)
for total-sent > 0 {
// buffer仅用于保存从流中读取的字节
// 响应的大小是操作系统页面大小的倍数,在boltdb中获取
// e.g. 4*1024
// NOTE: 方法不会等待客户端接收到消息。因此,在发送操作之间,不能安全地重用缓冲区。
// 简而言之,这段注释提醒开发者,在发送操作之间不能重用缓冲区,
// 因为srv.Send方法不会等待消息被客户端接收完毕。
// 申请一个缓存区:32 * 1024
buf := make([]byte, snapshotSendBufferSize)
// 从管道中读取快照数据到缓存区buf中
n, err := io.ReadFull(pr, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return togRPCError(err)
}
// 将读取的字节数加到sent变量中
sent += int64(n)
// 创建一个pb.SnapshotResponse对象,存储读取的快照数据、剩余字节数和存储版本
resp := &pb.SnapshotResponse{
RemainingBytes: uint64(total - sent),
Blob: buf[:n],
Version: storageVersion,
}
// 将pb.SnapshotResponse对象发送给客户端,并使用h.Write()方法将快照数据写入SHA256校验和的计算器中。
if err = srv.Send(resp); err != nil {
return togRPCError(err)
}
h.Write(buf[:n])
// 循环继续,直到发送完所有的快照数据
}
// 计算SHA256校验和
sha := h.Sum(nil)
// 省略部分代码,这段代码大概就是记录快照相关日志,创建pb.SnapshotResponse对象,发送response给客户端
... ...
return nil
}
2.backend.Snapshot()函数
func (b *backend) Snapshot() Snapshot {
// 提交一个批处理事务。
b.batchTx.Commit()
// 在互斥锁b.mu上获取一个读锁,并在函数执行结束后释放该锁。
b.mu.RLock()
defer b.mu.RUnlock()
// 开启一个只读事务
tx, err := b.db.Begin(false)
if err != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
stopc, donec := make(chan struct{}), make(chan struct{})
// 计算事务的大小
dbBytes := tx.Size()
go func() {
defer close(donec)
// sendRateBytes是基于在1千兆比特/秒的连接上传输快照数据的假设下计算出来的,假设TCP的最小吞吐量为100MB/秒。
var sendRateBytes int64 = 100 * 1024 * 1024
// 根据事务大小计算出一个警告超时时间
warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
if warningTimeout < minSnapshotWarningTimeout {
warningTimeout = minSnapshotWarningTimeout
}
// 计算传输快照数据所需的时间,并在超过一定阈值时记录一个警告日志。
// 这个goroutine会不断循环,直到接收到stopc通道的信号。
start := time.Now()
ticker := time.NewTicker(warningTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.lg.Warn(
"snapshotting taking too long to transfer",
zap.Duration("taking", time.Since(start)),
zap.Int64("bytes", dbBytes),
zap.String("size", humanize.Bytes(uint64(dbBytes))),
)
case <-stopc:
snapshotTransferSec.Observe(time.Since(start).Seconds())
return
}
}
}()
// 返回一个指向snapshot结构体的指针,其中核心是包含了一个数据库读事务
return &snapshot{tx, stopc, donec}
}
四.总结
执行defrag命令的核心代码逻辑是创建一个临时数据库,再将原来的数据库里面的数据读出来,将其写入临时数据库,之后再用完成数据拷贝的临时数据库来替换原来的数据库,即完成defrag操作。
从代码逻辑中可以看出,在执行defrag时,会加很多的锁并重置事务,并且在完成数据拷贝之后,会关闭数据库,在替换数据库之后,再打开数据库,由此即解释了为什么defrag的时候,节点会超时,因为被锁住了并且还会经历数据库关闭和再次打开的操作。
执行snapshot命令的核心代码逻辑是开启一个数据库读事务,再通过pipe管道,将快照读取出来并发送给客户端,最终通过客户端保存到本地
从代码逻辑中可以看出,在执行snapshot的时候,首先将批处理事务提交,事务提交之后申请一个读锁(该锁是一个共享读锁,阻塞写),然后启动一个数据库读事务,将etcd的db数据读出来发送给客户端完成snapshot操作,由此可推断出,在测试环境遭遇的执行snapshot操作时,其他的写操作超时,是因为共享读锁未释放导致的,并且偶尔出现的读超时推测可能是IO带宽不够导致。
五.关于snapshot,defrag以及compact策略调整建议
建议降低snapshot频率,或者通过镜像集群的方式来对etcd进行备份:
1.snapshot用于etcd数据库备份,执行snapshot时会申请一个共享读锁,该锁会阻塞写操作,导致这段时间内的写操作全部超时。
建议去掉defrag操作,或者大幅度降低defrag频率:
1.defrag用于磁盘碎片整理,执行defrag时,会加很多的锁,这个时候执行defrag的节点处于几乎不可用状态,通过测试发现,compact之后,再写入数据,是优先将数据写到compact之后的磁盘空洞里面,不过写入性能在当前测试环境下面会降低35%左右(待后续找到高性能磁盘,再进行该测试)
2.鉴于性能降低,可在特殊时间执行一次defrag,例如每个星期或者每个月的业务波谷时,执行一次defrag。
建议增加compact频率:
1.根据当前现网环境对etcd的使用方式来看,过多的写操作和不适当的用法,导致etcd数据增长过快,且很多数据的存活时间并不长,因此建议增加compact频率,让数据总量保持一个恒定。