searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

ETCD的defrag和snapshot的源码解析以及相关策略调整建议

2023-09-01 03:51:52
156
0

一.背景

     经过这段时间以来的各项针对etcd defrag和snapshot的测试情况发现,数据量较大的情况下(3.8G),etcd在defrag和snapshot的时候,对应节点会存在超时情况,且defrag时该节点处于不可用状态。为了搞清楚为什么节点会出现这个情况,这里需要深入源码,学习了解其背后的逻辑。
 

二. defrag代码

    defrag核心代码逻辑在etcd项目中server/storage/backend/backend.go文件中,主要是两个函数:defrag() 和 defragdb(odb, tmpdb *bolt.DB, limit int) error
 
1.defrag()函数
func (b *backend) defrag() error {
    // 启动计时器以测量执行时间
    now := time.Now()
    // 设置一个标志以指示碎片整理正在进行中,并在方法完成时延迟将标志设置回0
    isDefragActive.Set(1)
    defer isDefragActive.Set(0)

    // 锁定batchTx以确保没有人在使用之前的事务,并关闭正在进行的事务。
    b.batchTx.LockOutsideApply()
    defer b.batchTx.Unlock()

    // 锁定数据库以避免死锁。
    b.mu.Lock()
    defer b.mu.Unlock()

    // 锁定并发读取请求,同时重置事务。
    b.readTx.Lock()
    defer b.readTx.Unlock()

    // 提交当前事务并将其设置为nil。
    b.batchTx.unsafeCommit(true)
    b.batchTx.tx = nil


    // 在与数据库相同的目录中创建临时文件。
    dir := filepath.Dir(b.db.Path())
    temp, err := os.CreateTemp(dir, "db.tmp.*")
    if err != nil {
        return err
    }
    
    // 省略部分代码
    ... ...
    
    // 使用BoltDB库打开临时数据库
    tmpdb, err := bolt.Open(tdbp, 0600, &options)
    if err != nil {
        return err
    }

    // 记录数据库的当前大小和使用大小
    dbp := b.db.Path()
    size1, sizeInUse1 := b.Size(), b.SizeInUse()
    if b.lg != nil {
        b.lg.Info(
            "defragmenting",
            zap.String("path", dbp),
            zap.Int64("current-db-size-bytes", size1),
            zap.String("current-db-size", humanize.Bytes(uint64(size1))),
            zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
            zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
        )
    }
    
    // 调用defragdb函数,通过将内容复制到临时数据库来进行数据库碎片整理
    err = defragdb(b.db, tmpdb, defragLimit)
    if err != nil {
        tmpdb.Close()
        // 如果碎片整理失败,则清理临时数据库并返回错误
        if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
            b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
        }
        return err
    }

    // 关闭原始数据库
    err = b.db.Close()
    if err != nil {
        b.lg.Fatal("failed to close database", zap.Error(err))
    }
    // 关闭临时数据库
    err = tmpdb.Close()
    if err != nil {
        b.lg.Fatal("failed to close tmp database", zap.Error(err))
    }
    // 将临时数据库重命名为替换原始数据库。 
    err = os.Rename(tdbp, dbp)
    if err != nil {
        b.lg.Fatal("failed to rename tmp database", zap.Error(err))
    }

    // 打开重命名后的数据库
    b.db, err = bolt.Open(dbp, 0600, b.bopts)
    if err != nil {
        b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
    }
    // 开始一个新的事务并将其分配给batchTx。
    b.batchTx.tx = b.unsafeBegin(true)

    // 重置读取事务并将其分配给readTx。
    b.readTx.reset()
    b.readTx.tx = b.unsafeBegin(false)

    // 计算数据库的新大小和使用大小。
    size := b.readTx.tx.Size()
    db := b.readTx.tx.DB()
    atomic.StoreInt64(&b.size, size)
    atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

    // 记录执行时间
    took := time.Since(now)
    defragSec.Observe(took.Seconds())

    // 记录数据库的最终大小和使用大小
    size2, sizeInUse2 := b.Size(), b.SizeInUse()
    if b.lg != nil {
        b.lg.Info(
            "finished defragmenting directory",
            zap.String("path", dbp),
            zap.Int64("current-db-size-bytes-diff", size2-size1),
            zap.Int64("current-db-size-bytes", size2),
            zap.String("current-db-size", humanize.Bytes(uint64(size2))),
            zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
            zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
            zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
            zap.Duration("took", took),
        )
    }
    return nil
}

 

2. defragdb(odb, tmpdb *bolt.DB, limit int) error函数
// 该函数的入参是原来的数据库和新建的临时数据库,limit参数是单个事务中可以复制的最大数量(下面代码中会解释limit)
func defragdb(odb, tmpdb *bolt.DB, limit int) error {
    // 在 tmpdb 临时数据库上打开一个写事务
    tmptx, err := tmpdb.Begin(true)
    if err != nil {
        return err
    }
    defer func() {
        if err != nil {
            tmptx.Rollback()
        }
    }()


    // 在 odb 原来的数据库上打开一个读事务,
    tx, err := odb.Begin(false)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 创建与根桶相关联的游标
    c := tx.Cursor()

    // 遍历 odb 数据库中的每个桶,并将键值对复制到 tmpdb 数据库中相应的桶中。
    count := 0
    for next, _ := c.First(); next != nil; next, _ = c.Next() {
        b := tx.Bucket(next)
        if b == nil {
            return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
        }

        tmpb, berr := tmptx.CreateBucketIfNotExists(next)
        if berr != nil {
            return berr
        }
        tmpb.FillPercent = 0.9 // for bucket2seq write in for each


        // limit 参数确定在单个事务中可以复制的键值对的最大数量。如果键值对的数量超过了 limit,则在 
        // tmpdb 数据库上启动一个新的写事务,并继续该过程,直到所有的键值对都已复制
        if err = b.ForEach(func(k, v []byte) error {
            count++
            if count > limit {
                err = tmptx.Commit()
                if err != nil {
                    return err
                }
                tmptx, err = tmpdb.Begin(true)
                if err != nil {
                    return err
                }
                tmpb = tmptx.Bucket(next)
                tmpb.FillPercent = 0.9 // for bucket2seq write in for each


                count = 0
            }
            // 将键值对复制到 tmpdb 数据库中相应的桶中
            return tmpb.Put(k, v)
        }); err != nil {
            return err
        }
    }

    // 提交tmpdb 数据库上的写事务, 完成复制
    return tmptx.Commit()
}

 

三. snapshot 代码

     snapshot核心代码逻辑在etcd项目中server/etcdserver/api/maintenance.go文件和server/storage/backend/backend.go文件中,主要是三个函数:maintenanceServer.Snapshot函数,backend.Snapshot()函数和一个数据库开启读事务的db.begin函数。
 
1.maintenanceServer.Snapshot函数:
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
    // 读取存储版本信息,并将其存储在storageVersion变量中
    ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx())
    storageVersion := ""
    if ver != nil {
        storageVersion = ver.String()
    }
    // 创建一个数据库的快照,并将快照结果存储在snap变量中
    snap := ms.bg.Backend().Snapshot()
    // 创建了一个io.Pipe对象,并将其分为读端pr和写端pw。
    // 这将用于将快照数据写入管道,并从管道中读取数据发送给客户端。
    pr, pw := io.Pipe()


    defer pr.Close()

    // 启动一个goroutine,其中调用snap.WriteTo(pw)方法将快照数据写入管道,并在写入完成后关闭pipe写端。
    go func() {
        snap.WriteTo(pw)
        if err := snap.Close(); err != nil {
            ms.lg.Warn("failed to close snapshot", zap.Error(err))
        }
        pw.Close()
    }()


    // 用于计算快照数据的SHA256校验和。
    h := sha256.New()

    // 初始化发送数据量
    sent := int64(0)
    // 获取快照数据的总大小
    total := snap.Size()
    // 格式化为人类可读的格式
    size := humanize.Bytes(uint64(total))

    // 记录发送快照数据的开始时间,打印快照的总字节数、大小、存储版本等信息
    start := time.Now()
    ms.lg.Info("sending database snapshot to client",
        zap.Int64("total-bytes", total),
        zap.String("size", size),
        zap.String("storage-version", storageVersion),
    )
    for total-sent > 0 {
        // buffer仅用于保存从流中读取的字节
        // 响应的大小是操作系统页面大小的倍数,在boltdb中获取
        // e.g. 4*1024
        // NOTE: 方法不会等待客户端接收到消息。因此,在发送操作之间,不能安全地重用缓冲区。
        // 简而言之,这段注释提醒开发者,在发送操作之间不能重用缓冲区,
        // 因为srv.Send方法不会等待消息被客户端接收完毕。
        // 申请一个缓存区:32 * 1024
        buf := make([]byte, snapshotSendBufferSize)

        // 从管道中读取快照数据到缓存区buf中
        n, err := io.ReadFull(pr, buf)
        if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
            return togRPCError(err)
        }
        // 将读取的字节数加到sent变量中
        sent += int64(n)


        // 创建一个pb.SnapshotResponse对象,存储读取的快照数据、剩余字节数和存储版本
        resp := &pb.SnapshotResponse{
            RemainingBytes: uint64(total - sent),
            Blob:           buf[:n],
            Version:        storageVersion,
        }
        // 将pb.SnapshotResponse对象发送给客户端,并使用h.Write()方法将快照数据写入SHA256校验和的计算器中。
        if err = srv.Send(resp); err != nil {
            return togRPCError(err)
        }
        h.Write(buf[:n])
        // 循环继续,直到发送完所有的快照数据
    }
    // 计算SHA256校验和
    sha := h.Sum(nil)

    // 省略部分代码,这段代码大概就是记录快照相关日志,创建pb.SnapshotResponse对象,发送response给客户端
    ... ...
    return nil
}

 

2.backend.Snapshot()函数

func (b *backend) Snapshot() Snapshot {
    // 提交一个批处理事务。
    b.batchTx.Commit()

    // 在互斥锁b.mu上获取一个读锁,并在函数执行结束后释放该锁。
    b.mu.RLock()
    defer b.mu.RUnlock()
    // 开启一个只读事务
    tx, err := b.db.Begin(false)
    if err != nil {
        b.lg.Fatal("failed to begin tx", zap.Error(err))
    }


    stopc, donec := make(chan struct{}), make(chan struct{})
    // 计算事务的大小
    dbBytes := tx.Size()
    go func() {
        defer close(donec)
        // sendRateBytes是基于在1千兆比特/秒的连接上传输快照数据的假设下计算出来的,假设TCP的最小吞吐量为100MB/秒。
        var sendRateBytes int64 = 100 * 1024 * 1024
        // 根据事务大小计算出一个警告超时时间
        warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
        if warningTimeout < minSnapshotWarningTimeout {
            warningTimeout = minSnapshotWarningTimeout
        }
        
        // 计算传输快照数据所需的时间,并在超过一定阈值时记录一个警告日志。
        // 这个goroutine会不断循环,直到接收到stopc通道的信号。
        start := time.Now()
        ticker := time.NewTicker(warningTimeout)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                b.lg.Warn(
                    "snapshotting taking too long to transfer",
                    zap.Duration("taking", time.Since(start)),
                    zap.Int64("bytes", dbBytes),
                    zap.String("size", humanize.Bytes(uint64(dbBytes))),
                )


            case <-stopc:
                snapshotTransferSec.Observe(time.Since(start).Seconds())
                return
            }
        }
    }()

    // 返回一个指向snapshot结构体的指针,其中核心是包含了一个数据库读事务
    return &snapshot{tx, stopc, donec}
}

 

四.总结

 
    执行defrag命令的核心代码逻辑是创建一个临时数据库,再将原来的数据库里面的数据读出来,将其写入临时数据库,之后再用完成数据拷贝的临时数据库来替换原来的数据库,即完成defrag操作。
    从代码逻辑中可以看出,在执行defrag时,会加很多的锁并重置事务,并且在完成数据拷贝之后,会关闭数据库,在替换数据库之后,再打开数据库,由此即解释了为什么defrag的时候,节点会超时,因为被锁住了并且还会经历数据库关闭和再次打开的操作。
 
    执行snapshot命令的核心代码逻辑是开启一个数据库读事务,再通过pipe管道,将快照读取出来并发送给客户端,最终通过客户端保存到本地
    从代码逻辑中可以看出,在执行snapshot的时候,首先将批处理事务提交,事务提交之后申请一个读锁(该锁是一个共享读锁,阻塞写),然后启动一个数据库读事务,将etcd的db数据读出来发送给客户端完成snapshot操作,由此可推断出,在测试环境遭遇的执行snapshot操作时,其他的写操作超时,是因为共享读锁未释放导致的,并且偶尔出现的读超时推测可能是IO带宽不够导致。
 
 

五.关于snapshot,defrag以及compact策略调整建议

 
建议降低snapshot频率,或者通过镜像集群的方式来对etcd进行备份:
1.snapshot用于etcd数据库备份,执行snapshot时会申请一个共享读锁,该锁会阻塞写操作,导致这段时间内的写操作全部超时。
 
建议去掉defrag操作,或者大幅度降低defrag频率:
1.defrag用于磁盘碎片整理,执行defrag时,会加很多的锁,这个时候执行defrag的节点处于几乎不可用状态,通过测试发现,compact之后,再写入数据,是优先将数据写到compact之后的磁盘空洞里面,不过写入性能在当前测试环境下面会降低35%左右(待后续找到高性能磁盘,再进行该测试)
2.鉴于性能降低,可在特殊时间执行一次defrag,例如每个星期或者每个月的业务波谷时,执行一次defrag。
 
建议增加compact频率:
1.根据当前现网环境对etcd的使用方式来看,过多的写操作和不适当的用法,导致etcd数据增长过快,且很多数据的存活时间并不长,因此建议增加compact频率,让数据总量保持一个恒定。
0条评论
0 / 1000
李****材
5文章数
0粉丝数
李****材
5 文章 | 0 粉丝
原创

ETCD的defrag和snapshot的源码解析以及相关策略调整建议

2023-09-01 03:51:52
156
0

一.背景

     经过这段时间以来的各项针对etcd defrag和snapshot的测试情况发现,数据量较大的情况下(3.8G),etcd在defrag和snapshot的时候,对应节点会存在超时情况,且defrag时该节点处于不可用状态。为了搞清楚为什么节点会出现这个情况,这里需要深入源码,学习了解其背后的逻辑。
 

二. defrag代码

    defrag核心代码逻辑在etcd项目中server/storage/backend/backend.go文件中,主要是两个函数:defrag() 和 defragdb(odb, tmpdb *bolt.DB, limit int) error
 
1.defrag()函数
func (b *backend) defrag() error {
    // 启动计时器以测量执行时间
    now := time.Now()
    // 设置一个标志以指示碎片整理正在进行中,并在方法完成时延迟将标志设置回0
    isDefragActive.Set(1)
    defer isDefragActive.Set(0)

    // 锁定batchTx以确保没有人在使用之前的事务,并关闭正在进行的事务。
    b.batchTx.LockOutsideApply()
    defer b.batchTx.Unlock()

    // 锁定数据库以避免死锁。
    b.mu.Lock()
    defer b.mu.Unlock()

    // 锁定并发读取请求,同时重置事务。
    b.readTx.Lock()
    defer b.readTx.Unlock()

    // 提交当前事务并将其设置为nil。
    b.batchTx.unsafeCommit(true)
    b.batchTx.tx = nil


    // 在与数据库相同的目录中创建临时文件。
    dir := filepath.Dir(b.db.Path())
    temp, err := os.CreateTemp(dir, "db.tmp.*")
    if err != nil {
        return err
    }
    
    // 省略部分代码
    ... ...
    
    // 使用BoltDB库打开临时数据库
    tmpdb, err := bolt.Open(tdbp, 0600, &options)
    if err != nil {
        return err
    }

    // 记录数据库的当前大小和使用大小
    dbp := b.db.Path()
    size1, sizeInUse1 := b.Size(), b.SizeInUse()
    if b.lg != nil {
        b.lg.Info(
            "defragmenting",
            zap.String("path", dbp),
            zap.Int64("current-db-size-bytes", size1),
            zap.String("current-db-size", humanize.Bytes(uint64(size1))),
            zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
            zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
        )
    }
    
    // 调用defragdb函数,通过将内容复制到临时数据库来进行数据库碎片整理
    err = defragdb(b.db, tmpdb, defragLimit)
    if err != nil {
        tmpdb.Close()
        // 如果碎片整理失败,则清理临时数据库并返回错误
        if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
            b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
        }
        return err
    }

    // 关闭原始数据库
    err = b.db.Close()
    if err != nil {
        b.lg.Fatal("failed to close database", zap.Error(err))
    }
    // 关闭临时数据库
    err = tmpdb.Close()
    if err != nil {
        b.lg.Fatal("failed to close tmp database", zap.Error(err))
    }
    // 将临时数据库重命名为替换原始数据库。 
    err = os.Rename(tdbp, dbp)
    if err != nil {
        b.lg.Fatal("failed to rename tmp database", zap.Error(err))
    }

    // 打开重命名后的数据库
    b.db, err = bolt.Open(dbp, 0600, b.bopts)
    if err != nil {
        b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
    }
    // 开始一个新的事务并将其分配给batchTx。
    b.batchTx.tx = b.unsafeBegin(true)

    // 重置读取事务并将其分配给readTx。
    b.readTx.reset()
    b.readTx.tx = b.unsafeBegin(false)

    // 计算数据库的新大小和使用大小。
    size := b.readTx.tx.Size()
    db := b.readTx.tx.DB()
    atomic.StoreInt64(&b.size, size)
    atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

    // 记录执行时间
    took := time.Since(now)
    defragSec.Observe(took.Seconds())

    // 记录数据库的最终大小和使用大小
    size2, sizeInUse2 := b.Size(), b.SizeInUse()
    if b.lg != nil {
        b.lg.Info(
            "finished defragmenting directory",
            zap.String("path", dbp),
            zap.Int64("current-db-size-bytes-diff", size2-size1),
            zap.Int64("current-db-size-bytes", size2),
            zap.String("current-db-size", humanize.Bytes(uint64(size2))),
            zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
            zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
            zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
            zap.Duration("took", took),
        )
    }
    return nil
}

 

2. defragdb(odb, tmpdb *bolt.DB, limit int) error函数
// 该函数的入参是原来的数据库和新建的临时数据库,limit参数是单个事务中可以复制的最大数量(下面代码中会解释limit)
func defragdb(odb, tmpdb *bolt.DB, limit int) error {
    // 在 tmpdb 临时数据库上打开一个写事务
    tmptx, err := tmpdb.Begin(true)
    if err != nil {
        return err
    }
    defer func() {
        if err != nil {
            tmptx.Rollback()
        }
    }()


    // 在 odb 原来的数据库上打开一个读事务,
    tx, err := odb.Begin(false)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 创建与根桶相关联的游标
    c := tx.Cursor()

    // 遍历 odb 数据库中的每个桶,并将键值对复制到 tmpdb 数据库中相应的桶中。
    count := 0
    for next, _ := c.First(); next != nil; next, _ = c.Next() {
        b := tx.Bucket(next)
        if b == nil {
            return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
        }

        tmpb, berr := tmptx.CreateBucketIfNotExists(next)
        if berr != nil {
            return berr
        }
        tmpb.FillPercent = 0.9 // for bucket2seq write in for each


        // limit 参数确定在单个事务中可以复制的键值对的最大数量。如果键值对的数量超过了 limit,则在 
        // tmpdb 数据库上启动一个新的写事务,并继续该过程,直到所有的键值对都已复制
        if err = b.ForEach(func(k, v []byte) error {
            count++
            if count > limit {
                err = tmptx.Commit()
                if err != nil {
                    return err
                }
                tmptx, err = tmpdb.Begin(true)
                if err != nil {
                    return err
                }
                tmpb = tmptx.Bucket(next)
                tmpb.FillPercent = 0.9 // for bucket2seq write in for each


                count = 0
            }
            // 将键值对复制到 tmpdb 数据库中相应的桶中
            return tmpb.Put(k, v)
        }); err != nil {
            return err
        }
    }

    // 提交tmpdb 数据库上的写事务, 完成复制
    return tmptx.Commit()
}

 

三. snapshot 代码

     snapshot核心代码逻辑在etcd项目中server/etcdserver/api/maintenance.go文件和server/storage/backend/backend.go文件中,主要是三个函数:maintenanceServer.Snapshot函数,backend.Snapshot()函数和一个数据库开启读事务的db.begin函数。
 
1.maintenanceServer.Snapshot函数:
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
    // 读取存储版本信息,并将其存储在storageVersion变量中
    ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx())
    storageVersion := ""
    if ver != nil {
        storageVersion = ver.String()
    }
    // 创建一个数据库的快照,并将快照结果存储在snap变量中
    snap := ms.bg.Backend().Snapshot()
    // 创建了一个io.Pipe对象,并将其分为读端pr和写端pw。
    // 这将用于将快照数据写入管道,并从管道中读取数据发送给客户端。
    pr, pw := io.Pipe()


    defer pr.Close()

    // 启动一个goroutine,其中调用snap.WriteTo(pw)方法将快照数据写入管道,并在写入完成后关闭pipe写端。
    go func() {
        snap.WriteTo(pw)
        if err := snap.Close(); err != nil {
            ms.lg.Warn("failed to close snapshot", zap.Error(err))
        }
        pw.Close()
    }()


    // 用于计算快照数据的SHA256校验和。
    h := sha256.New()

    // 初始化发送数据量
    sent := int64(0)
    // 获取快照数据的总大小
    total := snap.Size()
    // 格式化为人类可读的格式
    size := humanize.Bytes(uint64(total))

    // 记录发送快照数据的开始时间,打印快照的总字节数、大小、存储版本等信息
    start := time.Now()
    ms.lg.Info("sending database snapshot to client",
        zap.Int64("total-bytes", total),
        zap.String("size", size),
        zap.String("storage-version", storageVersion),
    )
    for total-sent > 0 {
        // buffer仅用于保存从流中读取的字节
        // 响应的大小是操作系统页面大小的倍数,在boltdb中获取
        // e.g. 4*1024
        // NOTE: 方法不会等待客户端接收到消息。因此,在发送操作之间,不能安全地重用缓冲区。
        // 简而言之,这段注释提醒开发者,在发送操作之间不能重用缓冲区,
        // 因为srv.Send方法不会等待消息被客户端接收完毕。
        // 申请一个缓存区:32 * 1024
        buf := make([]byte, snapshotSendBufferSize)

        // 从管道中读取快照数据到缓存区buf中
        n, err := io.ReadFull(pr, buf)
        if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
            return togRPCError(err)
        }
        // 将读取的字节数加到sent变量中
        sent += int64(n)


        // 创建一个pb.SnapshotResponse对象,存储读取的快照数据、剩余字节数和存储版本
        resp := &pb.SnapshotResponse{
            RemainingBytes: uint64(total - sent),
            Blob:           buf[:n],
            Version:        storageVersion,
        }
        // 将pb.SnapshotResponse对象发送给客户端,并使用h.Write()方法将快照数据写入SHA256校验和的计算器中。
        if err = srv.Send(resp); err != nil {
            return togRPCError(err)
        }
        h.Write(buf[:n])
        // 循环继续,直到发送完所有的快照数据
    }
    // 计算SHA256校验和
    sha := h.Sum(nil)

    // 省略部分代码,这段代码大概就是记录快照相关日志,创建pb.SnapshotResponse对象,发送response给客户端
    ... ...
    return nil
}

 

2.backend.Snapshot()函数

func (b *backend) Snapshot() Snapshot {
    // 提交一个批处理事务。
    b.batchTx.Commit()

    // 在互斥锁b.mu上获取一个读锁,并在函数执行结束后释放该锁。
    b.mu.RLock()
    defer b.mu.RUnlock()
    // 开启一个只读事务
    tx, err := b.db.Begin(false)
    if err != nil {
        b.lg.Fatal("failed to begin tx", zap.Error(err))
    }


    stopc, donec := make(chan struct{}), make(chan struct{})
    // 计算事务的大小
    dbBytes := tx.Size()
    go func() {
        defer close(donec)
        // sendRateBytes是基于在1千兆比特/秒的连接上传输快照数据的假设下计算出来的,假设TCP的最小吞吐量为100MB/秒。
        var sendRateBytes int64 = 100 * 1024 * 1024
        // 根据事务大小计算出一个警告超时时间
        warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
        if warningTimeout < minSnapshotWarningTimeout {
            warningTimeout = minSnapshotWarningTimeout
        }
        
        // 计算传输快照数据所需的时间,并在超过一定阈值时记录一个警告日志。
        // 这个goroutine会不断循环,直到接收到stopc通道的信号。
        start := time.Now()
        ticker := time.NewTicker(warningTimeout)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                b.lg.Warn(
                    "snapshotting taking too long to transfer",
                    zap.Duration("taking", time.Since(start)),
                    zap.Int64("bytes", dbBytes),
                    zap.String("size", humanize.Bytes(uint64(dbBytes))),
                )


            case <-stopc:
                snapshotTransferSec.Observe(time.Since(start).Seconds())
                return
            }
        }
    }()

    // 返回一个指向snapshot结构体的指针,其中核心是包含了一个数据库读事务
    return &snapshot{tx, stopc, donec}
}

 

四.总结

 
    执行defrag命令的核心代码逻辑是创建一个临时数据库,再将原来的数据库里面的数据读出来,将其写入临时数据库,之后再用完成数据拷贝的临时数据库来替换原来的数据库,即完成defrag操作。
    从代码逻辑中可以看出,在执行defrag时,会加很多的锁并重置事务,并且在完成数据拷贝之后,会关闭数据库,在替换数据库之后,再打开数据库,由此即解释了为什么defrag的时候,节点会超时,因为被锁住了并且还会经历数据库关闭和再次打开的操作。
 
    执行snapshot命令的核心代码逻辑是开启一个数据库读事务,再通过pipe管道,将快照读取出来并发送给客户端,最终通过客户端保存到本地
    从代码逻辑中可以看出,在执行snapshot的时候,首先将批处理事务提交,事务提交之后申请一个读锁(该锁是一个共享读锁,阻塞写),然后启动一个数据库读事务,将etcd的db数据读出来发送给客户端完成snapshot操作,由此可推断出,在测试环境遭遇的执行snapshot操作时,其他的写操作超时,是因为共享读锁未释放导致的,并且偶尔出现的读超时推测可能是IO带宽不够导致。
 
 

五.关于snapshot,defrag以及compact策略调整建议

 
建议降低snapshot频率,或者通过镜像集群的方式来对etcd进行备份:
1.snapshot用于etcd数据库备份,执行snapshot时会申请一个共享读锁,该锁会阻塞写操作,导致这段时间内的写操作全部超时。
 
建议去掉defrag操作,或者大幅度降低defrag频率:
1.defrag用于磁盘碎片整理,执行defrag时,会加很多的锁,这个时候执行defrag的节点处于几乎不可用状态,通过测试发现,compact之后,再写入数据,是优先将数据写到compact之后的磁盘空洞里面,不过写入性能在当前测试环境下面会降低35%左右(待后续找到高性能磁盘,再进行该测试)
2.鉴于性能降低,可在特殊时间执行一次defrag,例如每个星期或者每个月的业务波谷时,执行一次defrag。
 
建议增加compact频率:
1.根据当前现网环境对etcd的使用方式来看,过多的写操作和不适当的用法,导致etcd数据增长过快,且很多数据的存活时间并不长,因此建议增加compact频率,让数据总量保持一个恒定。
文章来自个人专栏
lrc的专栏
5 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0