MIT 6.S081 - Lab Lock - Buffer cache (2) timestamp 单项优化

关键字:double check、乐观锁

代码传送门:OS_Learn/bio_timestamp.c at main · xuanhao44/OS_Learn (github.com)

【仅包含部分代码,不能运行请自行查看文章补充相关内容】

1 ticks 使用

时间戳可通过 kernel/trap.c 中的 ticks 获得(ticks 已在 kernel/def.h 中声明,kernel/bio.c 中可直接使用)。

kernel/def.h(部分截取)

// trap.c
extern uint     ticks;
void            trapinit(void);
void            trapinithart(void);
extern struct spinlock tickslock;
void            usertrapret(void);

使用 ticks 需要取得 tickslock 自旋锁。

使用例子:kernel/sysproc.c(部分截取)

// return how many clock tick interrupts have occurred
// since start.
uint64
sys_uptime(void)
{
  uint xticks;

  acquire(&tickslock);
  xticks = ticks;
  release(&tickslock);
  return xticks;
}

2 timestamp 的意义

timestamp 记录的是什么时间?——是缓存块最后一次被访问的时间。那么依照原来的设计,buf 的 timestamp 被更新的时间应该在 brelse 中(当 refcnt == 0 时)。

当然,也有人在 bget 的时候更新 timestamp。这也是可以的。上层文件系统在访问一个缓存块,这个完整的过程是必有 bget 和 brelse 的。从这一点上看,他们都能表示 "最后一次被访问"。

本篇的实现选择在 brelse 中更新。

3 来自指导书的 hint 理解

  • hint1:移除空闲缓存块列表(bcache.head)

原始的设计维护了一个 LRU 的链表,对这种有序的维护主要是由 brelse 完成的:每当 refcnt == 0 的时候,brelse 就会把 buf 移动到 most recent used 端,这样便一直维持了 LRU 的链表。LRU 链表就是以缓存块最后一次被访问的时间为标志的有序的结构。

移除 head 意味着什么?意味着用链表来高效维护的,以 brelse 驱动的设计不再被使用了。我们该如何利用 来构建新的设计呢?

这个问题有点大,不太好三两句话说完。下面的设计将会一步步说明。

让我们先移除这个 head 吧,同时我们也不需要 prev 和 next 了,加上 timestamp

显然所有函数都需要大改了,我们的改造才刚刚开始。

  • hint2:此项改动可使 brelse 不再需要锁上 bcache lock

原本的设计,brelse 不仅改变了链表的结构,也改变了某个元素的值(b->refcnt--),于是需要给这些操作加上 bcache lock

如果不加上 bcache lock,又没有其他措施的话,是无法满足 brelse 的要求的——据我们上面的分析,brelse 中需要 b->refcnt--,还需要 b->timestamp = ticks。怎么办呢?

答案是:为 buf 添加一个新的自旋锁。在对 buf 进行操作的时候,便加上这个自旋锁,从而不使用 bcache lock

所以我们现在对 buf 的定义修正为:

kernel/buf.h

struct buf {
  struct spinlock spinlock;
  struct sleeplock sleeplock;
  int valid;   // has data been read from disk?
  int disk;    // does disk "own" buf?
  uint dev;
  uint blockno;
  uint refcnt;
  uint timestamp;
  uchar data[BSIZE];
};

为了避免把自旋锁和睡眠锁搞混,改变了其名称。

4 改进 part1:binitbreadbrelsebpinbunpin

binit 很好说,把所有的锁初始化。因为不需要维护链表了,所以少了很多填充链表的代码。注意要以 bache 开头,不然测试时会出问题。

void binit(void)
{
  struct buf *b;

  initlock(&bcache.lock, "bcache");
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // 不能在自旋锁保护的临界区中使用睡眠锁,但是可以在睡眠锁保护的临界区中使用自旋锁。
    // 总之就是 sleeplock 要在外面,或者没关系
    initsleeplock(&b->sleeplock, "bcache.buffer.sleeplock");
    initlock(&b->spinlock, "bcache.buffer.spinlock");
  }
}

brelse 不再需要锁上 bcache lock,检测到 refcnt == 0 的时候更新 timestamp。至于 releasesleep(&b->sleeplock); 是放在 spinlock 之前还是之后,应该是都可以的。

// Release a locked buffer.
void brelse(struct buf *b)
{
  if (!holdingsleep(&b->sleeplock))
    panic("brelse");
  
  releasesleep(&b->sleeplock);

  acquire(&b->spinlock);
  b->refcnt--;
  if (b->refcnt == 0)
  {
    acquire(&tickslock);
    b->timestamp = ticks;
    release(&tickslock);
  }
  release(&b->spinlock);
}

对于 bpinbunpin,它们的锁可以改为 buf 的 spinlock。

void bpin(struct buf *b)
{
  acquire(&b->spinlock);
  b->refcnt++;
  release(&b->spinlock);
}

void bunpin(struct buf *b)
{
  acquire(&b->spinlock);
  b->refcnt--;
  release(&b->spinlock);
}

bwrite 不用修改逻辑;bread 的逻辑修改有待考虑(也许可以在修改 buf 的时候加上 spinlock 锁)。

// Return a locked buf with the contents of the indicated block.
struct buf *
bread(uint dev, uint blockno)
{
  struct buf *b;

  b = bget(dev, blockno);
  // if not cached, valid will be set to 0
  if (!b->valid)
  {
    virtio_disk_rw(b, 0);
    b->valid = 1;
  }
  return b;
}

// Write b's contents to disk.  Must be locked.
void bwrite(struct buf *b)
{
  if (!holdingsleep(&b->sleeplock))
    panic("bwrite");
  virtio_disk_rw(b, 1);
}

5 改进 part2:重头戏 bget

bget 显然是最重要的,它是磁盘缓存的核心。

如果简单的想,bget 需要被 bcache.lock 所保护。

在其中对每个 buf 的操作中,考虑到安全性,我们理所应当的加上其 spinlock 再进行处理。

5.1 严格检查

static struct buf *
bget(uint dev, uint blockno)
{
  struct buf *b;

  acquire(&bcache.lock);

  // Is the block already cached?
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    acquire(&b->spinlock);
    if (b->dev == dev && b->blockno == blockno)
    {
      b->refcnt++; // 该块被引用次数 ++
      release(&b->spinlock);
      release(&bcache.lock);
      acquiresleep(&b->sleeplock);
      return b;
    }
    else
      release(&b->spinlock);
  }

  // Not cached.
  // **Recycle** the least recently used (LRU) unused buffer.
  uint time_least = 0xffffffff;
  struct buf *lrub = (void*) 0;
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    acquire(&b->spinlock);
    // refcnt == 0 means unused
    if (b->refcnt == 0 && b->timestamp < time_least)
    {
      time_least = b->timestamp;
      lrub = b;
    }
    release(&b->spinlock);
  }
  
  acquire(&lrub->spinlock);
  if (lrub != (void*) 0)
  {
    lrub->dev = dev;
    lrub->blockno = blockno;
    lrub->valid = 0; // set valid 0, wait for virtio_disk_rw()
    lrub->refcnt = 1;
    release(&lrub->spinlock);
    release(&bcache.lock);
    acquiresleep(&lrub->sleeplock);
    return lrub;
  }
  else
  {
    release(&lrub->spinlock);
    release(&bcache.lock);
  }

  panic("bget: no buffers");
}

可以过 usertests,也就是说安全性可以保证;

但是不能过 bcachetest,下面是输出结果:

$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmems0: #fetch-and-add 0 #acquire() 32980
lock: kmems1: #fetch-and-add 0 #acquire() 76
lock: kmems2: #fetch-and-add 0 #acquire() 25
lock: bcache: #fetch-and-add 1596487 #acquire() 32924
--- top 5 contended locks:
lock: bcache: #fetch-and-add 1596487 #acquire() 32924
lock: virtio_disk: #fetch-and-add 90355 #acquire() 1182
lock: proc: #fetch-and-add 56558 #acquire() 77713
lock: proc: #fetch-and-add 17000 #acquire() 77361
lock: proc: #fetch-and-add 14397 #acquire() 77361
tot= 1596487
test0: FAIL
start test1
test1 OK

可以看到,锁争用的情况是相当严重的。远比原本的设计严重。

5.2 double check

为了改善上面严格的设计,我们进行进一步优化。

采用 “不安全 check + 加锁 double check” 的处理模式:利用某些可以允许的不安全来换取缩小的锁范围,然后再使用额外的手段保证正确性。

static struct buf *
bget(uint dev, uint blockno)
{
  struct buf *b;

  acquire(&bcache.lock);

  // Is the block already cached?
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // unsafe check
    if (b->dev == dev && b->blockno == blockno)
    {
      // acquire lock
      acquire(&b->spinlock);
      // double check
      if (b->dev == dev && b->blockno == blockno)
      {
        b->refcnt++; // 该块被引用次数 ++
        release(&b->spinlock);
        release(&bcache.lock);
        acquiresleep(&b->sleeplock);
        return b;
      }
      else
        release(&b->spinlock);
    }
  }

  // Not cached.
  // **Recycle** the least recently used (LRU) unused buffer.
  uint time_least = 0xffffffff;
  struct buf *lrub = (void *)0;
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {

    // refcnt == 0 means unused
    if (b->refcnt == 0 && b->timestamp < time_least)
    {
      acquire(&b->spinlock);
      if (b->refcnt == 0 && b->timestamp < time_least)
      {
        time_least = b->timestamp;
        lrub = b;
      }
      release(&b->spinlock);
    }
  }

  if (lrub != (void *)0)
  {
    acquire(&lrub->spinlock);
    if (lrub != (void *)0)
    {
      lrub->dev = dev;
      lrub->blockno = blockno;
      lrub->valid = 0; // set valid 0, wait for virtio_disk_rw()
      lrub->refcnt = 1;
      release(&lrub->spinlock);
      release(&bcache.lock);
      acquiresleep(&lrub->sleeplock);
      return lrub;
    }
    else
    {
      release(&lrub->spinlock);
      release(&bcache.lock);
    }
  }

  panic("bget: no buffers");
}

可以过 usertests,也就是说安全性可以保证;

但是不能过 bcachetest,下面是输出结果:

$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmems0: #fetch-and-add 0 #acquire() 32981
lock: kmems1: #fetch-and-add 0 #acquire() 50
lock: kmems2: #fetch-and-add 0 #acquire() 58
lock: bcache: #fetch-and-add 87859 #acquire() 32924
--- top 5 contended locks:
lock: proc: #fetch-and-add 113056 #acquire() 75793
lock: bcache: #fetch-and-add 87859 #acquire() 32924
lock: virtio_disk: #fetch-and-add 68156 #acquire() 1182
lock: proc: #fetch-and-add 21129 #acquire() 75460
lock: proc: #fetch-and-add 19293 #acquire() 75439
tot= 87859
test0: FAIL
start test1
test1 OK

可以看到,锁争用的情况比严格的情况好多了,但是既不能通过测试,也还是不如原本的设计。

5.3 查找命中不用 bcache.lock 保护(有错误)

有人提出,bget 在查找的时候不需要 bcache.lock,只需要在未命中的时候开始加上 bcache.lock

static struct buf *
bget(uint dev, uint blockno)
{
  struct buf *b;

  // Is the block already cached?
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    acquire(&b->spinlock);
    if (b->dev == dev && b->blockno == blockno)
    {
      b->refcnt++; // 该块被引用次数 ++
      release(&b->spinlock);
      acquiresleep(&b->sleeplock);
      return b;
    }
    else
      release(&b->spinlock);
  }

  acquire(&bcache.lock);

  // Not cached.
  // **Recycle** the least recently used (LRU) unused buffer.
  uint time_least = 0xffffffff;
  struct buf *lrub = (void*) 0;
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    acquire(&b->spinlock);
    // refcnt == 0 means unused
    if (b->refcnt == 0 && b->timestamp < time_least)
    {
      time_least = b->timestamp;
      lrub = b;
    }
    release(&b->spinlock);
  }
  
  acquire(&lrub->spinlock);
  if (lrub != (void*) 0)
  {
    lrub->dev = dev;
    lrub->blockno = blockno;
    lrub->valid = 0; // set valid 0, wait for virtio_disk_rw()
    lrub->refcnt = 1;
    release(&lrub->spinlock);
    release(&bcache.lock);
    acquiresleep(&lrub->sleeplock);
    return lrub;
  }
  else
  {
    release(&lrub->spinlock);
    release(&bcache.lock);
  }

  panic("bget: no buffers");
}

可以过 bcachetest,tot = 0。

$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmems0: #fetch-and-add 0 #acquire() 32955
lock: kmems1: #fetch-and-add 0 #acquire() 118
lock: kmems2: #fetch-and-add 0 #acquire() 22
lock: bcache: #fetch-and-add 0 #acquire() 85
--- top 5 contended locks:
lock: proc: #fetch-and-add 129503 #acquire() 77183
lock: virtio_disk: #fetch-and-add 79408 #acquire() 1194
lock: proc: #fetch-and-add 22881 #acquire() 76830
lock: time: #fetch-and-add 17670 #acquire() 32650
lock: proc: #fetch-and-add 14995 #acquire() 76830
tot= 0
test0: OK
start test1
test1 OK

这真令人开心!

但是,usertests 却失败了,也就是说,正确性没有了——这是简直是晴天霹雳!

先测试 bcachetest,再测试 usertests 会有如下结果。

$ usertests
usertests starting
test manywrites: panic: create: dirlink

先测试 usertests 会有如下结果。

$ usertests
usertests starting
test manywrites: panic: ilock: no type

这种改进到底出了什么错呢?

5.4 安全性处理

5.4.1 安全性处理(1)

5.3 bget 逻辑为:先查找数组是否存在缓存块,存在就返回,不存在就获取 bcache.lock ,从数组中拿一个 refcnt == 0 的缓存块来使用。

问题是你获取 bcache.lock 后没有再次检测是不是缓存块已经存在了,也就是说,之前检测命中的时候没有缓存块,但是再次检测命中的时候有这个缓存块了,你如果不去再次检测,那就不知道这种情况,就会拿一个 LRU 块去指向这个磁盘块——这样数组里面就有两个缓存块指向同一个磁盘块了。

所以,如果到了未命中查找阶段,我们需要在获取 bcache.lock 后再次查找命中。

再次检查的部分好说,那我们该怎么写 ”第一次检测" 呢?

到底是加 bcache.lockb->spinlock ,还是只加 b->spinlock 呢?

一些测试结果:

  • 如果都加,那么就能保证安全性(usertests 能过),但是 test1 过不了,tot 过大;
  • 如果只加 b->spinlock ,那么 tot 等于 0,但是不安全(usertests 报 panic)。

这便使人犯难。我们知道加了 bcache.lock 争用会多,这没问题。可是我们的未查找命中已经添加了再次检测的部分,为什么还是不安全?

5.4.2 安全性处理(2)

来看看第一阶段查找命中只加 b->spinlock 的问题吧。

思考一种情况:未命中寻找 LRU 块时,找到了一个这样的块,正准备替换,但是这个块被另一个进程在第一次无 bcache.lock 查找命中阶段拿走了。

同时注意到,能被拿走的原因正是因为一个很小的逻辑漏洞——遍历寻找 timestamp 最小的缓存块,但是找到这个时间戳最小的块之后应该需要一直拿着它的锁

而之前的实现里,为了方便,我一直忽视了这个细节。因为不太好处理。在找的过程中,你肯定会获取并释放一些块的锁。可是怎么才能知道在哪一次就找到了 timestamp 最小的缓存块,并且不释放它的锁呢?

避免这样做的方法是有的,下面介绍乐观锁。

了解概念:乐观锁和悲观锁。

  • 防止被抢走就是悲观锁,被抢走后重试就是乐观锁。
  • 冲突非常频繁悲观锁快,冲突很少就乐观锁快。
  • 乐观锁的实现相对简单。

我们可以选择悲观锁,也就是一直锁着这个时间戳最小的块;也可以选择乐观锁,允许你被抢走,我到时候再来检查一次,如果确实被抢走了,我就再查找一次。

这里我们使用乐观锁。怎么才是被抢走了呢?那自然是用 refcnt == 0 来判断。如果真的被抢走了,那么 refcnt 的值就不会是 0 了。如何重试呢?我们直接使用 goto 返回到第二阶段开头(大致逻辑)。

整体的判断逻辑应该形如(panic 的位置已经被重新安排):

  if (lrub == (void *)0) // no unused buf, should panic
  {
    release(&bcache.lock);
    panic("bget: no buffers");
  }
  else if (lrub != (void *)0 && lrub->refcnt == 0) // unsafe check
  {
  }
  else // grab by other's stage1
  {
    // release(&bcache.lock); no need
    goto LOOP; // goto stage2
  }
  1. 如果没有找到 lrub(lrub 为空),那么还是需要 panic 的。
  2. 如果找到了 lrub(lrub 不为空),但是被第一阶段抢走了(refcnt 不等于 0),那么再找一遍。
  3. 如果找到了 lrub(lrub 不为空),但是也没被第一阶段抢走(refcnt 等于 0),那么就确定了,可以开始替换。

goto 的 LOOP 位置需要斟酌:

我们可以看到,在需要 goto 的时候,我们都没有 release(&bcache.lock),这样,我们就可以回到 acquire 之后,并不需要先 release 再 acquire。

不仅如此,如果我们不释放这个锁,那么我们可以把 LOOP 放在未命中查找替换前

理由:当前进程没能从数组中查找命中,进入了到未命中查找替换的过程;其他进程就算要访问相同的文件,到数组中查找命中,一样会未命中,同时会被阻挡在 bcache.lock 的外面,直到之前那个进程替换结束,释放 bcache.lock 之后。所以当前进程不用担心会有其他进程能够使得自己在 goto 之后重新查找到命中——因为这个文件根本就还没被缓存。所以在 goto 回来之后,可以不用再查找命中一遍,直接再去数组中找 LRU 块。

并且,你可以想一想为什么 5.4.1 需要那样的处理,我们设想一个场景:

进程 A 查找命中缓存块 BUF,未命中,于是进入第二阶段查找替换;进程 B 也查找一样的缓存块 BUF, 既然 进程 A 当时没有找到,那么进程 B 就更没可能找到,于是进程 B 在完成第一阶段之后就在第二阶段前等着了。

那么进程 A 终于替换了缓存块之后,如果进程 B 还以为这个缓存块不在数组里,那他就会重新去查找替换——这当然是不对的!所以需要再次查找命中。当然,这样的事情只需要在进入第二阶段开始的时候做一次。

bget

static struct buf *
bget(uint dev, uint blockno)
{
  struct buf *b;

  // stage1: 第一次检测, 99% 会命中在第一次检测
  // 但是没有 `bcache.lock` 保护, 它可能会抢走别人的
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // unsafe check
    if (b->dev == dev && b->blockno == blockno)
    {
      // acquire lock
      acquire(&b->spinlock);
      // double check
      if (b->dev == dev && b->blockno == blockno)
      {
        b->refcnt++; // 该块被引用次数 ++
        release(&b->spinlock);
        acquiresleep(&b->sleeplock);
        return b;
      }
      else
        release(&b->spinlock);
    }
  }

  acquire(&bcache.lock);
  // stage2
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // unsafe check
    if (b->dev == dev && b->blockno == blockno)
    {
      // acquire lock
      acquire(&b->spinlock);
      // double check
      if (b->dev == dev && b->blockno == blockno)
      {
        b->refcnt++; // 该块被引用次数 ++
        release(&b->spinlock);
        release(&bcache.lock);
        acquiresleep(&b->sleeplock);
        return b;
      }
      else
        release(&b->spinlock);
    }
  }

  uint time_least;
  struct buf *lrub;

LOOP:
  // Not cached.
  // **Recycle** the least recently used (LRU) unused buffer.
  time_least = 0xffffffff;
  lrub = (void *)0;
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // refcnt == 0 means unused
    if (b->refcnt == 0 && b->timestamp < time_least)
    {
      acquire(&b->spinlock);
      if (b->refcnt == 0 && b->timestamp < time_least)
      {
        time_least = b->timestamp;
        lrub = b;
      }
      release(&b->spinlock);
    }
  }

  if (lrub == (void *)0) // no unused buf, should panic
  {
    release(&bcache.lock);
    panic("bget: no buffers");
  }
  else if (lrub != (void *)0 && lrub->refcnt == 0) // unsafe check
  {
    // acquire lock
    acquire(&lrub->spinlock);
    // double check
    if (lrub != (void *)0 && lrub->refcnt == 0)
    {
      lrub->dev = dev;
      lrub->blockno = blockno;
      lrub->valid = 0; // set valid 0, wait for virtio_disk_rw()
      lrub->refcnt = 1;

      release(&lrub->spinlock);
      release(&bcache.lock);
      acquiresleep(&lrub->sleeplock);
      return lrub;
    }
    else
    {
      release(&lrub->spinlock);
      // release(&bcache.lock); no need
      goto LOOP; // goto second check
    }
  }
  else // grab by other's stage1
  {
    // release(&bcache.lock); no need
    goto LOOP; // goto stage2
  }
}

测试结果:tot = 0;usertests 也过了,完美!

$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmems0: #fetch-and-add 0 #acquire() 33010
lock: kmems1: #fetch-and-add 0 #acquire() 1
lock: kmems2: #fetch-and-add 0 #acquire() 57
lock: kmems3: #fetch-and-add 0 #acquire() 18
lock: kmems7: #fetch-and-add 0 #acquire() 20
lock: bcache: #fetch-and-add 0 #acquire() 85
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2164
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2206
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2342
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 4378
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2089
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2114
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2158
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2540
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2202
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2202
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2194
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2138
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2108
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2070
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2066
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2072
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2063
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2067
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2063
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2063
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2063
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2314
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2104
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2062
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2062
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2070
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2070
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2070
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2070
lock: bcache.buffer.spinlock: #fetch-and-add 0 #acquire() 2070
--- top 5 contended locks:
lock: proc: #fetch-and-add 654904 #acquire() 221450
lock: proc: #fetch-and-add 554628 #acquire() 221806
lock: virtio_disk: #fetch-and-add 468252 #acquire() 1194
lock: proc: #fetch-and-add 360263 #acquire() 221449
lock: proc: #fetch-and-add 351016 #acquire() 221448
tot= 0
test0: OK
start test1
test1 OK

6 总结

为了优化性能以及减少锁争用,我们选择不给查找命中加上大锁,而给未命中查找替换加上大锁。

第二阶段必须是再次查找命中 + 未命中查找替换,这是一个原子的过程。

如果我们在第二阶段不一直持有我们找到的 LRU 块的锁,那么就可能会被第一阶段抢走。解决这个问题的办法是:悲观锁,即一直持有这个锁,这样就不会被抢走;乐观锁,即不一直持有这个锁,但是在使用块时会检测是否被其他进程抢走,如果被抢走那么就再去找。

多从实际的场景考虑,发现潜在的问题和优化点。

double check 的使用是可选的优化项,在不能通过测试的时候,就选用其来进行优化。

7 最终代码

// Buffer cache.
//
// The buffer cache is a linked list of buf structures holding
// cached copies of disk block contents.  Caching disk blocks
// in memory reduces the number of disk reads and also provides
// a synchronization point for disk blocks used by multiple processes.
//
// Interface:
// * To get a buffer for a particular disk block, call bread.
// * After changing buffer data, call bwrite to write it to disk.
// * When done with the buffer, call brelse.
// * Do not use the buffer after calling brelse.
// * Only one process at a time can use a buffer,
//     so do not keep them longer than necessary.

#include "types.h"
#include "param.h"
#include "spinlock.h"
#include "sleeplock.h"
#include "riscv.h"
#include "defs.h"
#include "fs.h"
#include "buf.h"

struct
{
  struct spinlock lock;
  struct buf buf[NBUF];
} bcache;

void binit(void)
{
  struct buf *b;

  initlock(&bcache.lock, "bcache");
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // 不能在自旋锁保护的临界区中使用睡眠锁,但是可以在睡眠锁保护的临界区中使用自旋锁。
    // 总之就是 sleeplock 要在外面,或者没关系
    initsleeplock(&b->sleeplock, "bcache.buffer.sleeplock");
    initlock(&b->spinlock, "bcache.buffer.spinlock");
  }
}

// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf *
bget(uint dev, uint blockno)
{
  struct buf *b;

  // stage1: 第一次检测, 99% 会命中在第一次检测
  // 但是没有 `bcache.lock` 保护, 它可能会抢走别人的
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // unsafe check
    if (b->dev == dev && b->blockno == blockno)
    {
      // acquire lock
      acquire(&b->spinlock);
      // double check
      if (b->dev == dev && b->blockno == blockno)
      {
        b->refcnt++; // 该块被引用次数 ++
        release(&b->spinlock);
        acquiresleep(&b->sleeplock);
        return b;
      }
      else
        release(&b->spinlock);
    }
  }

  acquire(&bcache.lock);
  // stage2
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // unsafe check
    if (b->dev == dev && b->blockno == blockno)
    {
      // acquire lock
      acquire(&b->spinlock);
      // double check
      if (b->dev == dev && b->blockno == blockno)
      {
        b->refcnt++; // 该块被引用次数 ++
        release(&b->spinlock);
        release(&bcache.lock);
        acquiresleep(&b->sleeplock);
        return b;
      }
      else
        release(&b->spinlock);
    }
  }

  uint time_least;
  struct buf *lrub;

LOOP:
  // Not cached.
  // **Recycle** the least recently used (LRU) unused buffer.
  time_least = 0xffffffff;
  lrub = (void *)0;
  for (b = bcache.buf; b < bcache.buf + NBUF; b++)
  {
    // refcnt == 0 means unused
    if (b->refcnt == 0 && b->timestamp < time_least)
    {
      acquire(&b->spinlock);
      if (b->refcnt == 0 && b->timestamp < time_least)
      {
        time_least = b->timestamp;
        lrub = b;
      }
      release(&b->spinlock);
    }
  }

  if (lrub == (void *)0) // no unused buf, should panic
  {
    release(&bcache.lock);
    panic("bget: no buffers");
  }
  else if (lrub != (void *)0 && lrub->refcnt == 0) // unsafe check
  {
    // acquire lock
    acquire(&lrub->spinlock);
    // double check
    if (lrub != (void *)0 && lrub->refcnt == 0)
    {
      lrub->dev = dev;
      lrub->blockno = blockno;
      lrub->valid = 0; // set valid 0, wait for virtio_disk_rw()
      lrub->refcnt = 1;

      release(&lrub->spinlock);
      release(&bcache.lock);
      acquiresleep(&lrub->sleeplock);
      return lrub;
    }
    else
    {
      release(&lrub->spinlock);
      // release(&bcache.lock); no need
      goto LOOP; // goto second check
    }
  }
  else // grab by other's stage1
  {
    // release(&bcache.lock); no need
    goto LOOP; // goto stage2
  }
}

// Return a locked buf with the contents of the indicated block.
struct buf *
bread(uint dev, uint blockno)
{
  struct buf *b;

  b = bget(dev, blockno);
  // if not cached, valid will be set to 0
  if (!b->valid)
  {
    virtio_disk_rw(b, 0);
    b->valid = 1;
  }
  return b;
}

// Write b's contents to disk.  Must be locked.
void bwrite(struct buf *b)
{
  if (!holdingsleep(&b->sleeplock))
    panic("bwrite");
  virtio_disk_rw(b, 1);
}

// Release a locked buffer.
void brelse(struct buf *b)
{
  if (!holdingsleep(&b->sleeplock))
    panic("brelse");

  releasesleep(&b->sleeplock);

  acquire(&b->spinlock);
  b->refcnt--;
  if (b->refcnt == 0)
  {
    acquire(&tickslock);
    b->timestamp = ticks;
    release(&tickslock);
  }
  release(&b->spinlock);
}

void bpin(struct buf *b)
{
  acquire(&b->spinlock);
  b->refcnt++;
  release(&b->spinlock);
}

void bunpin(struct buf *b)
{
  acquire(&b->spinlock);
  b->refcnt--;
  release(&b->spinlock);
}
最后修改:2022 年 11 月 13 日
如果觉得我的文章对你有用,请随意赞赏