绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
KV 存储引擎 - Badger源码分析
2022-04-21 10:39:50

KV 存储引擎 - Badger源码分析

  1. kv引擎的应用场景

  2. badger源码分析

  3. 优化与挑战

  4. 参考资料

为什么要学习KV引擎

kv引擎是提供对kv数据结构高性能读写的组件,通常嵌入其他DB server中提供从磁盘读写kv数据的一系列解决方案。

DB的数据模型通常分为,KV,表模型,文档模型,列存储,图模型。后四种从理论上都是可以由kv模型构建起来的。因此kv引擎成为现代很多nosql 数据库的底层存储引擎。旨在提供高性能的读写系统。kv引擎通常的数据结构是LSM的实现,天然对写入的高吞吐,是各种在线及分析型DB的理想选择。总结起来就是当今的各种分布式数据库都是以KV作为底层存储模型,在计算层映射为其目标存储模型的。可以说 一切皆kv

工业上,levelDB,rocksDB,badger 是比较知名的kv存储引擎。TIDB(分布式关系型)底层封装rocksDB,influxDB(时序数据库)早期使用leveldb,MangoDB(文档数据库)底层也是kv存储,Dgraph(图数据库)底层使用badger作为存储,Hbase(列存储数据库)存储在分布式文件系统上的KV模型,以及在大型互联网公司会自研分布式kv数据库以支持大规模的在线读写请求(缓存,以及索引),等价实现Redis协议,将底层内存存储换成分布式的RocksDB等持久化存储引擎,例如picak数据库。

如何阅读Go项目的源码

阅读官方文档

从文档中了解,项目所要解决的问题是什么,这个项目的意义是什么,了解足够的背景知识。

badger主要解决的是单机高性能的进行磁盘IO来读写kv数据并提供完善的api应对各种场景,同时badger将对于其他kv引擎来说其优势就是实现了whiskery论文,更加充分发挥ssd的优势,同时badger也是专门为了Dgraph这个图数据库开发的项目,为了解决go语言cgo调用rocksDB的复杂问题。

阅读官方推荐的资料

通常在文档或者wiki中会说明该项目参考了哪些经典资料,总能追溯到某个论文或者文章作为作者的灵感来源,找到一篇或多篇这样的文章仔细研读。

比如你就可以找到官网的whiskery论文

阅读官方examples

找到官方使用的例子,查看这个项目是如何被使用的。

工业级实现 badger

  1. DB 初始化

  2. 读写事务

  3. 只读事务

  4. 范围查询

  5. GC

  6. LSM日志合并

源码安装项目

go get github.com/dgraph-io/badger/v3
复制代码

写一个例子

package main

import (

        "fmt"

        badger "github.com/dgraph-io/badger/v3"

)

func main() {

  //  打开DB

  db, err := badger.Open(badger.DefaultOptions("/tmp/badger"))

  defer db.Close()

  

  // 读写事务

  err = db.Update(func(txn *badger.Txn) error {

      // Your code here…

      txn.Set([]byte("answer"), []byte("42"))

      txn.Get([]byte("answer"))

      return nil

  })

  // 只读事务

  err = db.View(func(txn *badger.Txn) error {

      // Your code here…

      txn.Get([]byte("answer_v1"))

      return nil

  })

  // 遍历keys

 err = db.View(func(txn *badger.Txn) error {

          opts := badger.DefaultIteratorOptions

          opts.PrefetchSize = 10

          it := txn.NewIterator(opts)

          defer it.Close()

          for it.Rewind(); it.Valid(); it.Next() {

               item := it.Item()

               k := item.Key()

               err := item.Value(func(v []byte) error {

                  fmt.Printf("key=%s, value=%s\n", k, v)

                  return nil

               })

               if err != nil {

                  return err

               }

           }

           return nil

       })

   // vlog 的GC

   err = db.RunValueLogGC(0.7)

   _ = err

}
复制代码

使用 devle 进行debug调试

阅读源码有一个基本问题,那就是遇到一个函数究竟是先进去看实现自底向上的去阅读,还是先理解它的输入输出理解这个函数做了什么事情自顶向下的阅读。

delve 是一个go语言调试工具。

DB初始化

  1. 构造配置参数对象

  2. 打开或创建工作目录并上锁

  3. 打开或创建ManifestFile文件

  4. 创建DB对象

    1. 创建内存表列表,用于预写日志
    2. 创建通知刷新任务的channel
    3. 创建写入任务的channel
    4. 配置对象
    5. manifest文件对象
    6. 目录锁
    7. value目录锁
    8. 创建oracle对象,用于并发事务的控制
  5. 创建一个块缓存

  6. 创建一个索引缓存

  7. 开启key注册器

  8. 打开memtable 并全部追加到 imm 的列表中

    1. 创建一个内存跳表对象
    2. 封装.mem文件为一个logFile对象取名为wal
    3. 打开 .mem结尾的文件并关联为mmap文件
  9. 创建一个激活的内存表

  10. 创建一个level管理器

    1. 创建一个tables的二维数组
    2. 初始化每一个.sst结尾的文件关联为一个mmap文件
    3. 根据manfist里的记录,初始化每一层的table对象
    4. 如果是L0层则按fid排序,否则按每个表中小key进行排序
  11. vlog初始化

    1. 打开一个DISCARD的文件并关联为mmap文件
  12. 启动日志合并过程

  13. 获取DB的事务大版本号

  14. 打开vlog文件, 关联mmap

    1. 按fid排序处理vlog文件
    2. 读取目录填充vlog file 的map
    3. 如果没有vlog 文件则创建一个新的
    4. 如果vlog文件是空的则删除
    5. 获取vlog文件的实际大小并截断文件
    6. 创建一个新的活跃的vlog文件
  15. 开启负责处理写请求的工作协程

  16. 启动vlog文件的GC过程

读写事务

  1. 检查事务是否关闭

  2. 创建一个事务对象

    1. 如果是一个读写事务

      1. 创建一个存储哪些key存在冲突的记录map

      2. 记录在当前事务上发生写入的key的列表

      3. 分配一个事务读取时间戳

        1. 从 nextReadTnx中获取一个时间戳的值

        2. 然后标记为开始,记录后的事物时间戳后发送给一个markChan

        3. 等待与这个时间戳冲突的事物提交完成

          1. 获取后一个已提交的时间戳
          2. 创建一个用于wait回调的chan
          3. 等待waitChan的回调,或者上下文的取消
  3. Defer 丢弃终的事务

    1. 标记readTs时间戳已经完成
  4. 执行闭包函数

    1. Set kv

      1. 检查kv的合法性
      2. 检查当前事务的执行的命令数量以及存储大小是否超过阈值
      3. 按照key计算一个hash值,然后加入冲突检查map中
      4. 如果这个key 在当前事务中被写入过,并且与之前的版本不同,则计入重复写入的数组中
      5. 将该key的写入操作记录到pending数组里
    2. Get k

      1. 如果这是一个读写事务

        1. 如果在pending数组里存在一个key,并且没有过期 则复制数据并返回
  5. 提交事务

    1. 检查 pending数组是否为空,如果为空则直接返回

    2. 事务提交的前置检查

    3. 提交并发送日志返回一个回调函数进行等待

      1. 获取一把锁

      2. 创建一个提交时间戳

        1. 是否存在冲突

          1. 如果读时间戳大于已提交的时间戳则忽略
          2. 读时间戳小于已提交的时间戳则判断是否存在读后写的情况存在就冲突
        2. 拿到当前事务的readTs标记其为完成

        3. 清理已经提交的事务的记录数组

        4. 标记当前的事务开始进行提交,分配了一个新的提交时间戳

        5. 将当前提交事务的时间戳和冲突的key组成对象记录在已提交事务的数组中

      3. 遍历每个在当前事务中写入的key,为其分配的版本号

      4. 遍历pending数组处理每一个实体kv对象

      5. 追加一条标记事务结束的内部kv实体

        1. 创建一个写入请求

        2. 发送给写者channel中

        3. 返回一个回调函数

          1. 等待批量写请求处理完成
          2. 标记事务已经提交完成
          3. 将buf写入到mmap关联的文件内存中
        4. 写入磁盘

          1. 如果值日志offset大于vlog大文件大小,或者写入条数超购阈值

            1. 写入磁盘 如果设置了同步则会系统调用sync 后根据offset进行截断
            2. 创建一个新的vlog文件

WaterMark 实现 - > badger 事务的设计

doWrite实现

  1. 读取writeCh中的req

  2. Select writeCh 批量拼装 reqs

  3. 如果没有请求,则pending阻塞住,等待上一次批量写入的完成

  4. 开启一个协程处理此次写请求

    1. 整个写入vlog文件

      1. 检查写入的vlaue是否合法值是否足够写入4G大小的文件,因为值指针是uin32

      2. 获取vlog的文件锁,获取大的文件id,大的也是活跃的

      3. 拿到句柄后遍历reqlist

        1. 解析每个请求的值查看大小是否超过分离的阈值
        2. 拿到vlog当前的offset值
        3. 用fid和offset拼接一个值的指针
        4. 编码kv数据写入buf
    2. 分req写入LSM中

      1. 检查当前的值是否超过了值的阈值

      2. 没有超过则 直接将值存储到mt中

        1. 先写入wal中,用于日志恢复

          1. 编码kv数据

            1. kv数据在文件中的编码是 head,key,value,crc
            2. 其中head 包括 keyLen,valueLen, expiresAt, meta,userMeta
          2. 直接copy数据到mmap内存上

        2. 真正存储到跳表中

      3. 超过则 将值的指针存储到mt中 指针指向vlog中的位置

  5. 当该doWrite关闭后,先读取所有writeCh中的请求后后处理一次写入请求

只读事务

  1. 与读写事务的区别 就是只获取了读时间戳,而不需要提交事务

  2. 检查key是否合法

  3. 创建一个itme对象

  4. 如果是一个读写事务 则从pending数组中找是否有刚刚被修改的key

    1. 如果命中则直接返回
    2. 记录当前的读取的key到reads数组中
  5. 以readTs拼接时间戳版本号

  6. 获取内存表列表

  7. 遍历内存表从跳表中get key信息,并且其大版本号小于等于查询版本时直接返回

  8. 如果没有则从level中查询

    1. 逐层遍历

      1. 从某一层获取数据

        1. 获取本层中sstable句柄

          1. 如果是0层则倒序获取所有的table对象
          2. 对于非0层则通过二分法比较每个表的大key来至多确定一个table对象
        2. 用不带有时间戳的key计算hash值

        3. 遍历tables

          1. 判断此table是否存在此key,使用boolFilter

          2. 创建一个迭代器对象

          3. Seek 这个key

            1. 判断是向前迭代还是向后迭代

            2. 置位指针是否从头迭代

            3. 二分查找确定key在哪一个block上

              1. 获取一个idx块 如果索引块缓存中没有 就mmap中获取
              2. 更新索引快到缓存中
            4. 从索引块中获得此key所在的bolckIdx的位置

            5. 然后先从缓存中加载块缓存的地址缓存的key是fid+offset组成

            6. 没有则从mmap映射的文件句柄中获取并加载到内存后返

            7. 解析当前block的元数据

          4. 查找block中的key

            1. 二分法搜索这个block
            2. 解析basekey,前缀压缩
            3. 基于offset进行编解码,找到对应的key,value字节数据
        4. 比较value的版本,返回大版本号的value对象

  9. 拼装数据返回结果item

遍历keys

  1. 创建一个默认的迭代器对象

  2. 配置预读kv条数为10

  3. 创建一个事务层迭代器对象

    1. 获取内存表列表

    2. 给当前vlog对象一个迭代器引用计数表示当前迭代器

    3. 创建一个迭代器数组

    4. 创建一个对PendingWrites数组的迭代器,并指定是正向还是反向

      1. 如果不是读写事务,或者PendingWrites长度为0,则直接返回
      2. 否则对PendingWrites数组返回排序的切片,并返回迭代器
    5. 为每个内存表创建一个迭代器并append到迭代器数组中

    6. 为每个level创建一个迭代器

      1. 遍历每一个level中的table创建迭代器

        1. 如果是L0层则所有的table都要创建迭代器

          1. 判断如果没有指定前缀则默认都需要遍历
          2. 如果指定了前缀则判断前缀是否在此table中如果不在则不用创建迭代器
        2. 如果是非0层则先判断是否需要前缀迭代需要的话则按前缀过滤

        3. 使用boolfilter过滤掉一定不存在前缀key的table

    7. 将table按遍历方向进行反转后返回

    8. 根据迭代器数组创建一个合并迭代器

      1. 创建一个merge迭代器对象,配置其迭代方向
      2. 将迭代器数组构建为一个颗二叉树
  4. seek(nil) 将游标移动到初始位置,遍历的,迭代性的调研底层迭代器的seek

  5. 预先读取几个kv到迭代器的链表中

    1. 跳过内部key
    2. 跳过版本大于readTs的key,说明这是在此迭代器开始之后写入的key不能被读取
    3. 如果指定返回所有版本的key则无论迭代到什么key都直接返回
    4. 判断是否是前向遍历,判断与后一次返回的key是否是同一个是则忽略,并立即更新好访问的key,以保证快照在del情况下的一致性
  6. 解析value的值,并从中判断是否过期与删除

  7. 创建一个item对象,并填充

    1. 如果值是一个值指针,则创建一个协程异步的读取vlog文件
    2. 值指针由fid+offset+len组成,管理好vlog文件分段的读写锁
    3. 然后上锁去读取mmap中的数据
  8. 放到预先读取的list上,直到读取了足够的预取内容

  9. 检查itme是否合法,如果合法就是说item不为空,则调用next方法再次执行上述步骤

vlog 的GC

  1. 获取一个vlog文件并执行GC

    1. 如果有其他GC任务执行则返回错误

    2. 选择一个vlog文件

      1. 选择包含多可丢弃数据的vlog文件

        1. 遍历所有的槽位置,比较每一个vlog文件可丢弃数据数量,返回大的fid
        2. 拿到fid后更新这个vlog文件的可丢弃数量
        3. 判断当前fid是否被删除,删除的话更新并重新获取
        4. 判断当前的fid存在的kv中的可丢弃数据大于阈值
        5. 大于则返回
    3. 当这个文件进行GC

  2. 重写当前的vlog文件

    1. 从头遍历该文件的kv
    2. 拿到key后去lsm中查询是否被删除
    3. 如果没有被删除,则将新版本的值写入到wb中
    4. wb达到阈值后批量的再次写入DB中
    5. 判断当前文件上是否还有引用存在,没有的话就可以从map中删除了
    6. 然后判断是否可以立即将文件物理删除
  3. 更新GC统计结果

LSM日志合并

  1. 启动合并协程

    1. 启动4个协程进行压缩

      1. 随机延迟1s执行

      2. 每10s 2号协程序 执行一次

        1. 计算目标层,其原理是保证每一层的存储大小为上一层的一个量级
        2. 目标层会是Lbase层,比如L0层在DB为空时会直接压缩到L6层
      3. 执行合并逻辑

        1. 创建一个压缩任务

        2. 如果是0压缩,那就执行l0的特殊压缩策略

          1. 先尝试L0到Lbase的压缩

            1. 对压缩过程上锁
            2. 计算L0层中存在数据重合的sstable
            3. 计算Lbase层与L0层的sstable 列表重合范围有重合的sstable
            4. 将这些tables 全部加入compactStatus 的tables 的map中
          2. 失败了会尝试l0到l0的压缩过程

            1. 计算L0层有重合的sstable
            2. 然后将其加入compactStatus 的tables 的map中
            3. 如果当前表在进行压缩则直接忽略执行
        3. 运行压缩任务

          1. 对合并的table 合理的拆分为多个可并行的子压缩

          2. 真正的合并表返回新

            1. 对合并的表创建一个迭代器
            2. 使用迭代器遍历所有合并的表
            3. 然后写入buffer注意分割
            4. 将discard写入统计中
          3. 将其写入清淡文件

          4. 对新表与旧表进行替换

          5. 删除旧表

          6. 打印日志

  2. 启动flush 内存表协程

    1. 接受flushChan的消息,并且做一个聚合操作

    2. 对一段时间内的多个内存表进行一个合并

    3. 创建一个迭代器然后处理合并过程

      1. 迭代表结构,然后获取key判断是否写入buffer
      2. 将buffer数据写入创建的新表中
      3. 更新l0层的清单文件

总结反思

  1. 学习官方资料 是什么,有什么意义,怎么用,设计原理是什么,存在什么局限
  2. 将项目跑起来,可以进行Debug
  3. 结合官方demo或者单元测试,构建一个使用场景,进行Debug阅读
  4. 遍先BFS阅读,第二遍BFS+DFS阅读,第三遍DFS阅读
  5. 对于无法debug的问题,通过IDE阅读,从单元测试入手寻找代码位置
  6. 绘制思维导图或者流程,架构图,对核心数据结构可以绘制UML图
  7. 总结与反思

优化与挑战

  1. 热点识别与更新
  2. 动态参数的调整
  3. 这一期视频的不足之处
  4. 阅读源码后我们做什么?维护开源社区,贡献代码

作者:wonderstone
链接:https://juejin.cn/post/7074530912473448485

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Badger
创建时间:2022-04-15 15:34:19
Badger
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • itt0918
    专家
戳我,来吐槽~