# 一、项目概览 Dragonboat 是纯 Go 实现的(multi-group)Raft 库。 为应用屏蔽 Raft 复杂性,提供易于使用的 NodeH

作者:得物技术日期:2025/11/27

一、项目概览

Dragonboat 是纯 Go 实现的(multi-group)Raft 库。

为应用屏蔽 Raft 复杂性,提供易于使用的 NodeHost 和状态机接口。该库(自称)有如下特点:

  • 高吞吐、流水线化、批处理;
  • 提供了内存/磁盘状态机多种实现;
  • 提供了 ReadIndex、成员变更、Leader转移等管理端API;
  • 默认使用 Pebble 作为 存储后端。

本次代码串讲以V3的稳定版本为基础,不包括GitHub上v4版本内容。

二、整体架构

三、LogDB 统一存储

LogDB 模块是 Dragonboat 的核心持久化存储层,虽然模块名字有Log,但是它囊括了所有和存储相关的API,负责管理 Raft 协议的所有持久化数据,包括:

Raft状态 (RaftState)

Raft内部状态变更的集合结构

包括但不限于:

  • ClusterID/NodeID: 节点ID
  • RaftState: Raft任期、投票情况、commit进度
  • EntriesToSave:Raft提案日志数据
  • Snapshot:快照元数据(包括快照文件路径,快照大小,快照对应的提案Index,快照对应的Raft任期等信息)
  • Messages:发给其他节点的Raft消息
  • ReadyToReads:ReadIndex就绪的请求

引导信息 (Bootstrap)

1type Bootstrap struct {
2    Addresses map[uint64]string // 初始集群成员
3    Join      bool
4    Type      StateMachineType
5}
6

ILogDB的API如下:

1type ILogDB interface {
2
3
4    BinaryFormat() uint32 // 返回支持的二进制格式版本号
5
6
7    ListNodeInfo() ([]NodeInfo, error) // 列出 LogDB 中所有可用的节点信息
8
9
10    // 存储集群节点的初始化配置信息,包括是否加入集群、状态机类型等
11    SaveBootstrapInfo(clusterID uint64, nodeID uint64, bootstrap pb.Bootstrap) error
12
13
14    // 获取保存的引导信息
15    GetBootstrapInfo(clusterID uint64, nodeID uint64) (pb.Bootstrap, error)
16
17
18    // 原子性保存 Raft 状态、日志条目和快照元数据
19    SaveRaftState(updates []pb.Update, shardID uint64) error
20
21
22    // 迭代读取指定范围内的连续日志条目
23    IterateEntries(ents []pb.Entry, size uint64, clusterID uint64, nodeID uint64, 
24                   low uint64, high uint64, maxSize uint64) ([]pb.Entry, uint64, error)
25
26
27    // 读取持久化的 Raft 状态
28    ReadRaftState(clusterID uint64, nodeID uint64, lastIndex uint64) (RaftState, error)
29
30
31    // 删除指定索引之前的所有条目, 日志压缩、快照后清理旧日志
32    RemoveEntriesTo(clusterID uint64, nodeID uint64, index uint64) error
33
34
35    // 回收指定索引之前条目占用的存储空间
36    CompactEntriesTo(clusterID uint64, nodeID uint64, index uint64) (<-chan struct{}, error)
37
38
39    // 保存所有快照元数据
40    SaveSnapshots([]pb.Update) error
41
42
43    // 删除指定的快照元数据 清理过时或无效的快照
44    DeleteSnapshot(clusterID uint64, nodeID uint64, index uint64) error
45
46
47    // 列出指定索引范围内的可用快照
48    ListSnapshots(clusterID uint64, nodeID uint64, index uint64) ([]pb.Snapshot, error)
49
50
51    // 删除节点的所有相关数据
52    RemoveNodeData(clusterID uint64, nodeID uint64) error
53
54
55    // 导入快照并创建所有必需的元数据
56    ImportSnapshot(snapshot pb.Snapshot, nodeID uint64) error
57}
58

3.1索引键

存储的底层本质是一个KVDB (pebble or rocksdb),由于业务的复杂性,要统一各类业务key的设计方法,而且要降低空间使用,所以有了如下的key设计方案。

龙舟中key分为3类:

其中,2字节的header用于区分各类不同业务的key空间。

1entryKeyHeader       = [2]byte{0x1, 0x1}  // 普通日志条目
2persistentStateKey   = [2]byte{0x2, 0x2}  // Raft状态
3maxIndexKey          = [2]byte{0x3, 0x3}  // 最大索引记录
4nodeInfoKey          = [2]byte{0x4, 0x4}  // 节点元数据
5bootstrapKey         = [2]byte{0x5, 0x5}  // 启动配置
6snapshotKey          = [2]byte{0x6, 0x6}  // 快照索引
7entryBatchKey        = [2]byte{0x7, 0x7}  // 批量日志
8

在key的生成中,采用了useAsXXXKey和SetXXXKey的方式,复用了data这个二进制变量,减少GC。

1type Key struct {
2    data []byte  // 底层字节数组复用池
3    key  []byte  // 有效数据切片
4    pool *sync.Pool // 似乎并没有什么用
5}
6
7
8func (k *Key) useAsEntryKey() {
9    k.key = k.data
10}
11
12
13type IReusableKey interface {
14    SetEntryBatchKey(clusterID uint64, nodeID uint64, index uint64)
15    // SetEntryKey sets the key to be an entry key for the specified Raft node
16    // with the specified entry index.
17    SetEntryKey(clusterID uint64, nodeID uint64, index uint64)
18    // SetStateKey sets the key to be an persistent state key suitable
19    // for the specified Raft cluster node.
20    SetStateKey(clusterID uint64, nodeID uint64)
21    // SetMaxIndexKey sets the key to be the max possible index key for the
22    // specified Raft cluster node.
23    SetMaxIndexKey(clusterID uint64, nodeID uint64)
24    // Key returns the underlying byte slice of the key.
25    Key() []byte
26    // Release releases the key instance so it can be reused in the future.
27    Release()
28}
29
30
31func (k *Key) useAsEntryKey() {
32    k.key = k.data
33}
34
35
36// SetEntryKey sets the key value to the specified entry key.
37func (k *Key) SetEntryKey(clusterID uint64, nodeID uint64, index uint64) {
38    k.useAsEntryKey()
39    k.key[0] = entryKeyHeader[0]
40    k.key[1] = entryKeyHeader[1]
41    k.key[2] = 0
42    k.key[3] = 0
43    binary.BigEndian.PutUint64(k.key[4:], clusterID)
44    // the 8 bytes node ID is actually not required in the key. it is stored as
45    // an extra safenet - we don't know what we don't know, it is used as extra
46    // protection between different node instances when things get ugly.
47    // the wasted 8 bytes per entry is not a big deal - storing the index is
48    // wasteful as well.
49    binary.BigEndian.PutUint64(k.key[12:], nodeID)
50    binary.BigEndian.PutUint64(k.key[20:], index)
51}
52

3.2变量复用IContext

IContext的核心设计目的是实现并发安全的内存复用机制。在高并发场景下,频繁的内存分配和释放会造成较大的GC压力,通过IContext可以实现:

  • 键对象复用:通过GetKey()获取可重用的IReusableKey
  • 缓冲区复用:通过GetValueBuffer()获取可重用的字节缓冲区
  • 批量操作对象复用:EntryBatch和WriteBatch的复用
1// IContext is the per thread context used in the logdb module.
2// IContext is expected to contain a list of reusable keys and byte
3// slices that are owned per thread so they can be safely reused by the
4// same thread when accessing ILogDB.
5type IContext interface {
6    // Destroy destroys the IContext instance.
7    Destroy()
8    // Reset resets the IContext instance, all previous returned keys and
9    // buffers will be put back to the IContext instance and be ready to
10    // be used for the next iteration.
11    Reset()
12    // GetKey returns a reusable key.
13    GetKey() IReusableKey // 这就是上文中的key接口
14    // GetValueBuffer returns a byte buffer with at least sz bytes in length.
15    GetValueBuffer(sz uint64) []byte
16    // GetWriteBatch returns a write batch or transaction instance.
17    GetWriteBatch() interface{}
18    // SetWriteBatch adds the write batch to the IContext instance.
19    SetWriteBatch(wb interface{})
20    // GetEntryBatch returns an entry batch instance.
21    GetEntryBatch() pb.EntryBatch
22    // GetLastEntryBatch returns an entry batch instance.
23    GetLastEntryBatch() pb.EntryBatch
24}
25
26
27type context struct {
28    size    uint64
29    maxSize uint64
30    eb      pb.EntryBatch
31    lb      pb.EntryBatch
32    key     *Key
33    val     []byte
34    wb      kv.IWriteBatch
35}
36
37
38func (c *context) GetKey() IReusableKey {
39    return c.key
40}
41
42
43func (c *context) GetValueBuffer(sz uint64) []byte {
44    if sz <= c.size {
45        return c.val
46    }
47    val := make([]byte, sz)
48    if sz < c.maxSize {
49        c.size = sz
50        c.val = val
51    }
52    return val
53}
54
55
56func (c *context) GetEntryBatch() pb.EntryBatch {
57    return c.eb
58}
59
60
61func (c *context) GetLastEntryBatch() pb.EntryBatch {
62    return c.lb
63}
64
65
66func (c *context) GetWriteBatch() interface{} {
67    return c.wb
68}
69
70
71func (c *context) SetWriteBatch(wb interface{}) {
72    c.wb = wb.(kv.IWriteBatch)
73}
74

3.3存储引擎封装IKVStore

IKVStore 是 Dragonboat 日志存储系统的抽象接口,它定义了底层键值存储引擎需要实现的所有基本操作。这个接口让 Dragonboat 能够支持不同的存储后端(如 Pebble、RocksDB 等),实现了存储引擎的可插拔性。

1type IKVStore interface {
2    // Name is the IKVStore name.
3    Name() string
4    // Close closes the underlying Key-Value store.
5    Close() error
6
7
8    // 范围扫描 - 支持前缀遍历的迭代器
9    IterateValue(fk []byte,
10            lk []byte, inc bool, op func(key []byte, data []byte) (bool, error)) error
11    
12    // 查询操作 - 基于回调的内存高效查询模式
13    GetValue(key []byte, op func([]byte) error) error
14    
15    // 写入操作 - 单条记录的原子写入
16    SaveValue(key []byte, value []byte) error
17
18
19    // 删除操作 - 单条记录的精确删除
20    DeleteValue(key []byte) error
21    
22    // 获取批量写入器
23    GetWriteBatch() IWriteBatch
24    
25    // 原子提交批量操作
26    CommitWriteBatch(wb IWriteBatch) error
27    
28    // 批量删除一个范围的键值对
29    BulkRemoveEntries(firstKey []byte, lastKey []byte) error
30    
31    // 压缩指定范围的存储空间
32    CompactEntries(firstKey []byte, lastKey []byte) error
33    
34    // 全量压缩整个数据库
35    FullCompaction() error
36}
37
38
39type IWriteBatch interface {
40    Destroy()                 // 清理资源,防止内存泄漏
41    Put(key, value []byte)    // 添加写入操作
42    Delete(key []byte)        // 添加删除操作
43    Clear()                   // 清空批处理中的所有操作
44    Count() int               // 获取当前批处理中的操作数量
45}
46

openPebbleDB是Dragonboat 中 Pebble 存储引擎的初始化入口,负责根据配置创建一个完整可用的键值存储实例。

1// KV is a pebble based IKVStore type.
2type KV struct {
3    db       *pebble.DB
4    dbSet    chan struct{}
5    opts     *pebble.Options
6    ro       *pebble.IterOptions
7    wo       *pebble.WriteOptions
8    event    *eventListener
9    callback kv.LogDBCallback
10    config   config.LogDBConfig
11}
12
13
14var _ kv.IKVStore = (*KV)(nil)
15
16
17// openPebbleDB
18// =============
19//  Dragonboat  LogDBConfig  Pebble 引擎实例
20func openPebbleDB(
21        cfg  config.LogDBConfig,
22        cb   kv.LogDBCallback,   // => busy通知:busy(true/false)
23        dir  string,             // 主数据目录
24        wal  string,             // WAL 独立目录(可空)
25        fs   vfs.IFS,            // 文件系统抽象(磁盘/memfs)
26) (kv.IKVStore, error) {
27    
28    //--------------------------------------------------
29    // 2️⃣ << 核心调优参数读入
30    //--------------------------------------------------
31    blockSz      := int(cfg.KVBlockSize)                    // 数据块(4K/8K…)
32    writeBufSz   := int(cfg.KVWriteBufferSize)              // 写缓冲
33    bufCnt       := int(cfg.KVMaxWriteBufferNumber)         // MemTable数量
34    l0Compact    := int(cfg.KVLevel0FileNumCompactionTrigger) // L0 层文件数量触发压缩的阈值
35    l0StopWrites := int(cfg.KVLevel0StopWritesTrigger)
36    baseBytes    := int64(cfg.KVMaxBytesForLevelBase)
37    fileBaseSz   := int64(cfg.KVTargetFileSizeBase)
38    cacheSz      := int64(cfg.KVLRUCacheSize)
39    levelMult    := int64(cfg.KVTargetFileSizeMultiplier)  // 每层文件大小倍数
40    numLevels    := int64(cfg.KVNumOfLevels)
41    
42    
43    //--------------------------------------------------
44    // 4️⃣ 构建 LSM-tree 层级选项 (每层无压缩)
45    //--------------------------------------------------
46    levelOpts := []pebble.LevelOptions{}
47    sz := fileBaseSz
48    for lvl := 0; lvl < int(numLevels); lvl++ {
49        levelOpts = append(levelOpts, pebble.LevelOptions{
50            Compression:    pebble.NoCompression, // 写性能优先
51            BlockSize:      blockSz,
52            TargetFileSize: sz,                 // L0 < L1 <  呈指数增长
53        })
54        sz *= levelMult
55    }
56    
57    //--------------------------------------------------
58    // 5️⃣ 初始化依赖:LRU Cache + 读写选项
59    //--------------------------------------------------
60    cache := pebble.NewCache(cacheSz)    // block缓存
61    ro    := &pebble.IterOptions{}       // 迭代器默认配置
62    wo    := &pebble.WriteOptions{Sync: true} // ❗fsync强制刷盘
63    
64    opts := &pebble.Options{
65        Levels:                      levelOpts,
66        Cache:                       cache,
67        MemTableSize:                writeBufSz,
68        MemTableStopWritesThreshold: bufCnt,
69        LBaseMaxBytes:               baseBytes,
70        L0CompactionThreshold:       l0Compact,
71        L0StopWritesThreshold:       l0StopWrites,
72        Logger:                      PebbleLogger,
73        FS:                          vfs.NewPebbleFS(fs),
74        MaxManifestFileSize:         128 * 1024 * 1024,
75        // WAL 目录稍后条件注入
76    }
77    
78    kv := &KV{
79        dbSet:    make(chan struct{}),          // 关闭->初始化完成信号
80        callback: cb,                           // 上层 raft engine 回调
81        config:   cfg,
82        opts:     opts,
83        ro:       ro,
84        wo:       wo,
85    }
86    
87    event := &eventListener{
88        kv:      kv,
89        stopper: syncutil.NewStopper(),
90    }
91    
92    // => 关键事件触发
93    opts.EventListener = pebble.EventListener{
94        WALCreated:    event.onWALCreated,
95        FlushEnd:      event.onFlushEnd,
96        CompactionEnd: event.onCompactionEnd,
97    }
98    
99    //--------------------------------------------------
100    // 7️⃣ 目录准备
101    //--------------------------------------------------
102    if wal != "" {
103        fs.MkdirAll(wal)        // 📁  WAL 单独磁盘预留
104        opts.WALDir = wal
105    }
106    fs.MkdirAll(dir)            // 📁 主数据目录
107    
108    //--------------------------------------------------
109    // 8️⃣ 真正的数据库实例化
110    //--------------------------------------------------
111    pdb, err := pebble.Open(dir, opts)
112    if err != nil { return nil, err }
113    
114    //--------------------------------------------------
115    // 9️⃣ 🧹 资源整理 & 启动事件
116    //--------------------------------------------------
117    cache.Unref()               // 去除多余引用,防止泄露
118    kv.db = pdb
119    
120    // 🔔 手动触发一次 WALCreated 确保反压逻辑进入首次轮询
121    kv.setEventListener(event)  // 内部 close(kv.dbSet)
122    
123    return kv, nil
124}
125

其中eventListener是对pebble 内存繁忙的回调,繁忙判断的条件有两个:

  • 内存表大小超过阈值(95%)
  • L0 层文件数量超过阈值(L0写入最大文件数量-1)
1
2
3func (l *eventListener) notify() {
4    l.stopper.RunWorker(func() {
5        select {
6        case <-l.kv.dbSet:
7            if l.kv.callback != nil {
8                memSizeThreshold := l.kv.config.KVWriteBufferSize *
9                    l.kv.config.KVMaxWriteBufferNumber * 19 / 20
10                l0FileNumThreshold := l.kv.config.KVLevel0StopWritesTrigger - 1
11                m := l.kv.db.Metrics()
12                busy := m.MemTable.Size >= memSizeThreshold ||
13                    uint64(m.Levels[0].NumFiles) >= l0FileNumThreshold
14                l.kv.callback(busy)
15            }
16        default:
17        }
18    })
19}
20

3.4日志条目存储DB

db结构体是Dragonboat日志数据库的核心管理器,提供Raft日志、快照、状态等数据的持久化存储接口。是桥接了业务和pebble存储的中间层。

1// db is the struct used to manage log DB.
2type db struct {
3    cs      *cache       // 节点信息、Raft状态信息缓存
4    keys    *keyPool     // Raft日志索引键变量池
5    kvs     kv.IKVStore  // pebble的封装
6    entries entryManager // 日志条目读写封装
7}
8
9
10// 这里面的信息不会过期,叫寄存更合适
11type cache struct {
12    nodeInfo       map[raftio.NodeInfo]struct{}
13    ps             map[raftio.NodeInfo]pb.State
14    lastEntryBatch map[raftio.NodeInfo]pb.EntryBatch
15    maxIndex       map[raftio.NodeInfo]uint64
16    mu             sync.Mutex
17}
18
  • 获取一个批量写容器

实现:

1func (r *db) getWriteBatch(ctx IContext) kv.IWriteBatch {
2    if ctx != nil {
3        wb := ctx.GetWriteBatch()
4        if wb == nil {
5            wb = r.kvs.GetWriteBatch()
6            ctx.SetWriteBatch(wb)
7        }
8        return wb.(kv.IWriteBatch)
9    }
10    return r.kvs.GetWriteBatch()
11}
12

降低GC压力

  • 获取所有节点信息

实现:

1func (r *db) listNodeInfo() ([]raftio.NodeInfo, error) {
2    fk := newKey(bootstrapKeySize, nil)
3    lk := newKey(bootstrapKeySize, nil)
4    fk.setBootstrapKey(0, 0)
5    lk.setBootstrapKey(math.MaxUint64, math.MaxUint64)
6    ni := make([]raftio.NodeInfo, 0)
7    op := func(key []byte, data []byte) (bool, error) {
8        cid, nid := parseNodeInfoKey(key)
9        ni = append(ni, raftio.GetNodeInfo(cid, nid))
10        return true, nil
11    }
12    if err := r.kvs.IterateValue(fk.Key(), lk.Key(), true, op); err != nil {
13        return []raftio.NodeInfo{}, err
14    }
15    return ni, nil
16}
17
  • 保存集群状态

实现:

1type Update struct {
2    ClusterID uint64  // 集群ID,标识节点所属的Raft集群
3    NodeID    uint64  // 节点ID,标识集群中的具体节点
4
5
6    State  // 包含当前任期(Term)、投票节点(Vote)、提交索引(Commit)三个关键持久化状态
7
8
9    EntriesToSave []Entry    // 需要持久化到稳定存储的日志条目
10    CommittedEntries []Entry // 已提交位apply的日志条目
11    MoreCommittedEntries bool  // 指示是否还有更多已提交条目等待处理
12
13
14    Snapshot Snapshot  // 快照元数据,当需要应用快照时设置
15
16
17    ReadyToReads []ReadyToRead  // ReadIndex机制实现的线性一致读
18
19
20    Messages []Message  // 需要发送给其他节点的Raft消息
21
22
23    UpdateCommit struct {
24        Processed         uint64  // 已推送给RSM处理的最后索引
25        LastApplied       uint64  // RSM确认已执行的最后索引
26        StableLogTo       uint64  // 已稳定存储的日志到哪个索引
27        StableLogTerm     uint64  // 已稳定存储的日志任期
28        StableSnapshotTo  uint64  // 已稳定存储的快照到哪个索引
29        ReadyToRead       uint64  // 已准备好读的ReadIndex请求索引
30    }
31}
32
33
34func (r *db) saveRaftState(updates []pb.Update, ctx IContext) error {
35      // 步骤1:获取写入批次对象,用于批量操作提高性能
36      // 优先从上下文中获取已存在的批次,避免重复创建
37      wb := r.getWriteBatch(ctx)
38      
39      // 步骤2:遍历所有更新,处理每个节点的状态和快照
40      for _, ud := range updates {
41          // 保存 Raft 的硬状态(Term、Vote、Commit)
42          // 使用缓存机制避免重复保存相同状态
43          r.saveState(ud.ClusterID, ud.NodeID, ud.State, wb, ctx)
44          
45          // 检查是否有快照需要保存
46          if !pb.IsEmptySnapshot(ud.Snapshot) {
47              // 快照索引一致性检查:确保快照索引不超过最后一个日志条目的索引
48              // 这是 Raft 协议的重要约束,防止状态不一致
49              if len(ud.EntriesToSave) > 0 {
50                  lastIndex := ud.EntriesToSave[len(ud.EntriesToSave)-1].Index
51                  if ud.Snapshot.Index > lastIndex {
52                      plog.Panicf("max index not handled, %d, %d",
53                          ud.Snapshot.Index, lastIndex)
54                  }
55              }
56              
57              // 保存快照元数据到数据库
58              r.saveSnapshot(wb, ud)
59              
60              // 更新节点的最大日志索引为快照索引
61              r.setMaxIndex(wb, ud, ud.Snapshot.Index, ctx)
62          }
63      }
64      
65      // 步骤3:批量保存所有日志条目
66      // 这里会调用 entryManager 接口的 record 方法,根据配置选择批量或单独存储策略
67      r.saveEntries(updates, wb, ctx)
68      
69      // 步骤4:提交写入批次到磁盘
70      // 只有在批次中有实际操作时才提交,避免不必要的磁盘 I/O
71      if wb.Count() > 0 {
72          return r.kvs.CommitWriteBatch(wb)
73      }
74      return nil
75  }
76  
77
  • 保存引导信息

实现:

1func (r *db) saveBootstrapInfo(clusterID uint64,
2    nodeID uint64, bs pb.Bootstrap) error {
3    wb := r.getWriteBatch(nil)
4    r.saveBootstrap(wb, clusterID, nodeID, bs)
5    return r.kvs.CommitWriteBatch(wb) // 提交至Pebble
6}
7
8
9func (r *db) saveBootstrap(wb kv.IWriteBatch,
10    clusterID uint64, nodeID uint64, bs pb.Bootstrap) {
11    k := newKey(maxKeySize, nil)
12    k.setBootstrapKey(clusterID, nodeID) // 序列化集群节点信息
13    data, err := bs.Marshal()
14    if err != nil {
15        panic(err)
16    }
17    wb.Put(k.Key(), data)
18}
19
  • 获取Raft状态

实现:

1func (r *db) getState(clusterID uint64, nodeID uint64) (pb.State, error) {
2    k := r.keys.get()
3    defer k.Release()
4    k.SetStateKey(clusterID, nodeID)
5    hs := pb.State{}
6    if err := r.kvs.GetValue(k.Key(), func(data []byte) error {
7        if len(data) == 0 {
8            return raftio.ErrNoSavedLog
9        }
10        if err := hs.Unmarshal(data); err != nil {
11            panic(err)
12        }
13        return nil
14    }); err != nil {
15            return pb.State{}, err
16    }
17    return hs, nil
18}
19

3.5对外存储API实现

龙舟对ILogDB提供了实现:ShardedDB,一个管理了多个pebble bucket的存储单元。

1var _ raftio.ILogDB = (*ShardedDB)(nil)
2// ShardedDB is a LogDB implementation using sharded pebble instances.
3type ShardedDB struct {
4    completedCompactions uint64             // 原子计数器:已完成压缩操作数
5    config               config.LogDBConfig // 日志存储配置
6    ctxs                 []IContext         // 分片上下文池,减少GC压力
7    shards               []*db              // 核心:Pebble实例数组
8    partitioner          server.IPartitioner // 智能分片策略器
9    compactionCh         chan struct{}      // 压缩任务信号通道
10    compactions          *compactions       // 压缩任务管理器
11    stopper              *syncutil.Stopper  // 优雅关闭管理器
12}
13
  • 初始化过程

实现:

1// 入口函数:创建并初始化分片日志数据库
2OpenShardedDB(config, cb, dirs, lldirs, batched, check, fs, kvf):
3
4
5    // ===阶段1:安全验证===
6    if 配置为空 then panic
7    if check和batched同时为true then panic
8
9
10    // ===阶段2:预分配资源管理器===
11    shards := 空数组
12    closeAll := func(all []*db) { //出错清理工具
13        for s in all {
14            s.close()
15        }
16    }
17
18
19    // ===阶段3:逐个创建分片===
20    loop i := 0  分片总数:
21        datadir := pathJoin(dirs[i], "logdb-"+i)  //数据目录
22        snapdir := ""                           //快照目录(可选)
23        if lldirs非空 {
24            snapdir = pathJoin(lldirs[i], "logdb-"+i)
25        }
26
27
28        shardCb := {shard:i, callback:cb}      //监控回调
29        db, err := openRDB(...)                //创建实际数据库实例
30        if err != nil {                        //创建失败
31            closeAll(shards)                   //清理已创建的
32            return nil, err
33        }
34        shards = append(shards, db)
35
36
37    // ===阶段5:核心组件初始化===
38    partitioner := 新建分区器(execShards数量, logdbShards数量)
39    instance := &ShardedDB{
40        shards:      shards,
41        partitioner: partitioner,
42        compactions: 新建压缩管理器(),
43        compactionCh: 通道缓冲1,
44        ctxs:       make([]IContext, 执行分片数),
45        stopper:    新建停止器()
46    }
47
48
49    // ===阶段6:预分配上下文&启动后台===
50    for j := 0  执行分片数:
51        instance.ctxs[j] = 新建Context(saveBufferSize)
52
53
54    instance.stopper.RunWorker(func() {        //后台压缩协程
55        instance.compactionWorkerMain()
56    })
57
58
59    return instance, nil                      //构造完成
60    
61

  • 保存集群状态

实现:

1func (s *ShardedDB) SaveRaftState(updates []pb.Update, shardID uint64) error {
2    if shardID-1 >= uint64(len(s.ctxs)) {
3        plog.Panicf("invalid shardID %d, len(s.ctxs): %d", shardID, len(s.ctxs))
4    }
5    ctx := s.ctxs[shardID-1]
6    ctx.Reset()
7    return s.SaveRaftStateCtx(updates, ctx)
8}
9
10
11func (s *ShardedDB) SaveRaftStateCtx(updates []pb.Update, ctx IContext) error {
12    if len(updates) == 0 {
13        return nil
14    }
15    pid := s.getParititionID(updates)
16    return s.shards[pid].saveRaftState(updates, ctx)
17}
18

以sylas为例子,我们每个分片都是单一cluster,所以logdb只使用了一个分片,龙舟设计初衷是为了解放多cluster的吞吐,我们暂时用不上,tindb可以考虑

四、总结

LogDB是Dragonboat重要的存储层实现,作者将Pebble引擎包装为一组通用简洁的API,极大方便了上层应用与存储引擎的交互成本。

其中包含了很多Go语言的技巧,例如大量的内存变量复用设计,展示了这个库对高性能的极致追求,是一个十分值得学习的优秀工程案例。

往期回顾

1. 从数字到版面:得物数据产品里数字格式化的那些事

2. 一文解析得物自建 Redis 最新技术演进

3. Golang HTTP请求超时与重试:构建高可靠网络请求|得物技术

4. RN与hawk碰撞的火花之C++异常捕获|得物技术

5. 得物TiDB升级实践

文 /酒米

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。


# 一、项目概览 Dragonboat 是纯 Go 实现的(multi-group)Raft 库。 为应用屏蔽 Raft 复杂性,提供易于使用的 NodeH》 是转载文章,点击查看原文


相关推荐


LeetCode 202. 快乐数
-森屿安年-2025/11/24

目录 相似题目原题题目解析算法原理代码实现 相似题目 环形链表 环形链表 II 原题 快乐数 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为: 对于一个正整数,每一次将该数替换为它每个位置上的数字的平方和。然后重复这个过程直到这个数变为 1,也可能是 无限循环 但始终变不到 1。如果这个过程 结果为 1,那么这个数就是快乐数。 如果 n 是快乐数就返回 true ;不是,则返回 false 。 示例 1: 输入:n = 19 输出:t


水果识别系统【最新版】Python+TensorFlow+Vue3+Django+人工智能+深度学习+卷积神经网络算法
青瓷程序设计2025/11/23

一、项目介绍 水果识别系统,通过TensorFlow搭建卷积神经网络算法,并收集了10种常见的水果数据集(‘哈密瓜’, ‘椰子’, ‘樱桃’, ‘火龙果’, ‘猕猴桃’, ‘红苹果’, ‘芒果’, ‘葡萄’, ‘西瓜’, ‘香蕉’),对其进行多轮迭代训练,最后得到了一个精度较高的模型,并搭建Web可视化操作平台。 前端: Vue3、Element Plus 后端:Django 算法:TensorFlow、卷积神经网络算法 具体功能: 系统分为管理员和用户两个角色,登录后根据角色显示其可访问的


单片机简单介绍
_the_sun2025/11/21

目录 一、单片机简介 二、单片机的命名规则与封装 三、单片机内部结构图与最小系统 (1)内部模块结构图 (2)最小系统         虽然以前也简单学习过单片机,但是当时还未曾学习数电模电,对于各种芯片、工作原理的理解较为浅显,只是停留在代码如何写的方面。现在补充了一定的数模电基础后想巩固一下知识,考虑到并未写一份单片机使用说明,于是开始尝试本系列。 一、单片机简介         单片机(Micro Controller Unit),简称MCU。即微小控制单元,通常用


Lua 的 collectgarbage 函数
IMPYLH2025/11/19

Lua 的 collectgarbage 函数 是用于控制垃圾回收机制的重要工具。该函数提供了多种操作模式来管理内存回收: 基本语法: collectgarbage(opt [, arg]) 主要操作选项: "collect" - 执行一次完整的垃圾回收循环"stop" - 暂停垃圾回收器"restart" - 重新启动垃圾回收器"count" - 返回当前 Lua 使用的内存总量(以 KB 为单位)"step" - 执行单步垃圾回收"setpause" - 设置回收器的暂停参数"set


马斯克Grok 4.1低调发布!通用能力碾压其他一切模型
机器之心2025/11/18

几乎毫无预兆,马斯克人工智能公司 xAI 发布了最新模型 Grok 4.1。 就在刚刚,xAI 宣布,Grok 4.1 已经向所有用户开放,可以在 Grok 官网、X 以及 iOS 和 Android 应用中使用。 Grok 4.1将立即在 Auto 模式中推送,并可在模型选择器中手动选择。 此次,Grok 4.1 将在真实世界可用性方面带来显著提升,尤其是在创造力、情感互动和协作交互方面表现出色。Grok 4.1 对细微意图的感知能力更强,与用户对话更加吸引人,整体人格也更连贯,同时完全保


Python 的内置函数 sorted
IMPYLH2025/11/17

Python 内建函数列表 > Python 的内置函数 sorted Python 的内置函数 sorted() 是一个用于排序的可迭代对象的高阶函数,它接受一个可迭代对象作为输入,并返回一个新的已排序的列表。与列表的 sort() 方法不同,sorted() 不会修改原始的可迭代对象,而是生成一个新的排序后的列表。 基本用法 sorted(iterable, key=None, reverse=False) iterable:需要排序的可迭代对象(如列表、元组、字符串等)key(可


线性代数 - 叉积的分量形式与矩阵形式
二分掌柜的2025/11/16

线性代数 - 叉积的分量形式与矩阵形式 flyfish 单位基向量与 向量的分解 1. 三维坐标系与单位基向量 图中蓝色的x轴、红色的y轴、绿色的z轴构成了一个右手系三维笛卡尔坐标系(符合“右手定则”的空间定向)。 在这个坐标系中,有三个单位基向量: i^\hat{i}i^(蓝色箭头):沿x轴方向的单位向量; j^\hat{j}j^​(红色箭头):沿y轴方向的单位向量; k^\hat{k}k^(绿色箭头):沿z轴方向的单位向量。 这三个基向量两两垂直(夹角为90∘90^\circ90∘) 2.


docker启动失败
AI小胖2025/11/15

** ** 问题很明确了:Failed at step LIMITS spawning /usr/bin/dockerd: Operation not permitted 和 status=205/LIMITS。这表明 systemd 在设置资源限制时遇到了权限问题。 解决方案: 1. 移除或修复有问题的资源限制配置 # 移除我们之前创建的 override 配置 rm -f /etc/systemd/system/docker.service.d/override.conf # 重新加载


实时大数据计算中,Spark的滑动窗口和允许消息迟到机制
sword_csdn2025/11/13

目录 1.开发环境2.几句话先概括3.例子说明3.1.参数配置3.2.窗口是如何产生的3.3.Trigger触发机制3.4.迟到的消息数据 最近做了个实时大数据分析的项目,发现很多东西都忘记了,属实没有好好整理笔记之过,趁眼下闲暇,做个回忆和记录。 1.开发环境 这次环境采用Java17+,Scala2.13,Spark的版本为4.0.0,且基于Kafka创建读取流。其它环境可参考以下maven pom。 <?xml version="1.0" encoding="UTF-8"


centos运维常用命令
KV_T2025/11/12

CentOS 服务器运维中,以下是按场景分类的常用命令,涵盖系统监控、用户管理、服务管理、文件操作等核心场景,适合日常运维参考: 一、系统状态监控 查看系统负载 uptime # 显示系统运行时间、用户数、1/5/15分钟负载 w # 更详细的负载信息,包括登录用户和进程 CPU 监控 top # 实时查看CPU、内存占用(按q退出) htop # 交互式CPU/内存监控(需安装:yum install htop) lscpu

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2025 聚合阅读