文章

MIT 6.S081 | 0x08 Locks

操作系统的一生之敌——并发与锁

Before writing code, make sure to read the following parts from the xv6 book:

  • Chapter 6: “Locking” and the corresponding code.
  • Section 3.5: “Code: Physical memory allocator”
  • Section 8.1 through 8.3: “Overview”, “Buffer cache layer”, and “Code: Buffer cache”

:blue_square: Memory allocator

Your job is to implement per-CPU freelists, and stealing when a CPU’s free list is empty. You must give all of your locks names that start with “kmem”. That is, you should call initlock for each of your locks, and pass a name that starts with “kmem”. Run kalloctest to see if your implementation has reduced lock contention. To check that it can still allocate all of memory, run usertests sbrkmuch. Your output will look similar to that shown below, with much-reduced contention in total on kmem locks, although the specific numbers will differ. Make sure all tests in usertests pass. make grade should say that the kalloctests pass.

分析

xv6 使用 kalloc/kfree 来对内存进行分配和释放,全局只用一把锁进行管理:

1
2
3
4
5
6
7
8
struct run {
  struct run *next;
};

struct {
  struct spinlock lock;
  struct run *freelist;
} kmem;

这种实现简单直接,但是在遇到多核情况下时,对 kmem.lock 这把锁的竞争会很激烈。所以这一 Part 的任务就是想一个办法减少锁的竞争

题目也给出了解决的思路,就是为每个 CPU 分别维护一个 freelist 和对应的锁,这样每个锁的粒度就会变小,锁的竞争也就减小了。整理一下,我们需要做的工作有:

  1. 为每个 CPU 分别维护一个 freelistlock
  2. 当某个 CPU 的 freelist 为空时,从其他 CPU 的 freelist 中偷取一些

第一部分直接就能写出来,将 kmem 这个结构体变成一个数组就行,CPU 的数量已经用宏定义好了:

1
2
3
4
struct {
  struct spinlock lock;
  struct run *freelist;
} kmem[NCPU];

这样每个 CPU 都有了对应的 freelistlock,在 kalloc/kfree 的时候,使用 cpuid() 获取对应的 CPU 号进行操作就好。剩下就是第二个部分的工作了:当一个 CPU 的 freelist 为空时,怎么从其他的 CPU 那里“偷”一些空间出来?

这个时候就要遍历 kmem 的其他 freelist 了,在遍历的时候记得跳过自己的 cpuid,在读取每个 CPU 的 freelist 时也要记得加锁。具体的细节可以看下面的代码。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
diff --git a/kernel/kalloc.c b/kernel/kalloc.c
index 0699e7e..ce21891 100644
--- a/kernel/kalloc.c
+++ b/kernel/kalloc.c
@@ -9,6 +9,8 @@
 #include "riscv.h"
 #include "defs.h"
 
+#define TOTAL_STEAL 64
+
 void freerange(void *pa_start, void *pa_end);
 
 extern char end[]; // first address after kernel.
@@ -21,12 +23,25 @@ struct run {
 struct {
   struct spinlock lock;
   struct run *freelist;
-} kmem;
+} kmem[NCPU];
+
+char *kmem_names[] = {
+  "kmem_lock_0",
+  "kmem_lock_1",
+  "kmem_lock_2",
+  "kmem_lock_3",
+  "kmem_lock_4",
+  "kmem_lock_5",
+  "kmem_lock_6",
+  "kmem_lock_7",
+};
 
 void
 kinit()
 {
-  initlock(&kmem.lock, "kmem");
+  for (int i = 0; i < NCPU; i++) {
+    initlock(&kmem[i].lock, kmem_names[i]);
+  }
   freerange(end, (void*)PHYSTOP);
 }
 
@@ -35,8 +50,10 @@ freerange(void *pa_start, void *pa_end)
 {
   char *p;
   p = (char*)PGROUNDUP((uint64)pa_start);
+  push_off();
   for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE)
     kfree(p);
+  pop_off();
 }
 
 // Free the page of physical memory pointed at by pa,
@@ -56,10 +73,13 @@ kfree(void *pa)
 
   r = (struct run*)pa;
 
-  acquire(&kmem.lock);
-  r->next = kmem.freelist;
-  kmem.freelist = r;
-  release(&kmem.lock);
+  push_off();
+  int cpu_id = cpuid();
+  acquire(&kmem[cpu_id].lock);
+  r->next = kmem[cpu_id].freelist;
+  kmem[cpu_id].freelist = r;
+  release(&kmem[cpu_id].lock);
+  pop_off();
 }
 
 // Allocate one 4096-byte page of physical memory.
@@ -70,11 +90,37 @@ kalloc(void)
 {
   struct run *r;
 
-  acquire(&kmem.lock);
-  r = kmem.freelist;
-  if(r)
-    kmem.freelist = r->next;
-  release(&kmem.lock);
+  push_off();
+  int cpu_id = cpuid();
+
+  acquire(&kmem[cpu_id].lock);
+  if (!kmem[cpu_id].freelist) {
+    // no enough space, try to steal space from other cpu
+    int list_left = TOTAL_STEAL;
+    for (int i = 0; i < NCPU; i++) {
+      if (i == cpu_id) continue;
+      acquire(&kmem[i].lock);
+      struct run *r_other_cpu = kmem[i].freelist;
+      while (r_other_cpu && list_left) {
+        // find free space, steal!
+        kmem[i].freelist = r_other_cpu->next;
+        r_other_cpu->next = kmem[cpu_id].freelist;
+        kmem[cpu_id].freelist = r_other_cpu;
+        r_other_cpu = kmem[i].freelist;
+        list_left--;
+      }
+      release(&kmem[i].lock);
+      if (list_left == 0) break;
+    }
+  }
+  release(&kmem[cpu_id].lock);
+
+  r = kmem[cpu_id].freelist;
+  if(r) {
+    kmem[cpu_id].freelist = r->next;
+  }
+
+  pop_off();
 
   if(r)
     memset((char*)r, 5, PGSIZE); // fill with junk

:red_square: Buffer cache

Modify the block cache so that the number of acquire loop iterations for all locks in the bcache is close to zero when running bcachetest. Ideally the sum of the counts for all locks involved in the block cache should be zero, but it’s OK if the sum is less than 500. Modify bget and brelse so that concurrent lookups and releases for different blocks that are in the bcache are unlikely to conflict on locks (e.g., don’t all have to wait for bcache.lock). You must maintain the invariant that at most one copy of each block is cached. When you are done, your output should be similar to that shown below (though not identical). Make sure usertests still passes. make grade should pass all tests when you are done.

分析

操作系统在读取硬盘数据时,会将读取到的数据缓存到内存中,这样下次再读取相同的数据时就可以直接从内存中读取,而不用再次从硬盘中读取。这个缓存就是缓冲区缓存(Buffer Cache),在 xv6 中是通过 bcache 这个结构体来实现的:

1
2
3
4
5
6
7
8
struct {
  struct spinlock lock;
  struct buf buf[NBUF];

  // Linked list of all buffers, through prev/next.
  // head.next is most recently used.
  struct buf head;
} bcache;

这个 bcache 是真的在多 CPU 多进程中共享的,全局只有一个锁:bcache.lock,锁竞争很激烈。提示有说用哈希表来减少竞争,其实就是一个大锁变成多个小锁,先设置 NBUCKET 个桶,每个桶对应一个锁,然后在 bgetbrelse 中对这些锁进行操作。下面是修改后的 bcache 结构体:

1
2
3
4
5
6
7
8
9
struct {
  struct spinlock lock;
  struct buf buf[NBUF];

  // Linked list of all buffers, through prev/next.
  // head.next is most recently used.
  struct buf head[NBUCKET];
  struct spinlock head_lock[NBUCKET];
} bcache;

原先只有一个 bcache.head,LRU 算法实现起来很方便。但是在有多个桶的时候,LRU 算法就比较捉急,因为每个桶都可能有一个最少使用的块,或者全局最少使用的块和当前操作的桶不在一个桶中,逻辑很复杂。提示也贴心地指出了如何解决:直接记录 buf 上次使用的 ticks 即可。找到全局 ticks 最少的那个 buf,然后将其放到当前桶的最前面。

最后很可能出问题的地方就是如何保证不发生死锁,这里有个博客讨论了这个问题,可以仔细去看看。总之一定要仔细考虑每个锁的顺序,不要出现循环等待的情况。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
diff --git a/kernel/bio.c b/kernel/bio.c
index 60d91a6..255d23e 100644
--- a/kernel/bio.c
+++ b/kernel/bio.c
@@ -23,32 +23,57 @@
 #include "fs.h"
 #include "buf.h"
 
+#define NBUCKET 13
+
+extern uint ticks;
+
 struct {
-  struct spinlock lock;
   struct buf buf[NBUF];
 
   // Linked list of all buffers, through prev/next.
   // Sorted by how recently the buffer was used.
   // head.next is most recent, head.prev is least.
-  struct buf head;
+  struct buf head[NBUCKET];
+  struct spinlock head_lock[NBUCKET];
 } bcache;
 
+char * head_lock_name[] = {
+  "bcache_head_lock_0",
+  "bcache_head_lock_1",
+  "bcache_head_lock_2",
+  "bcache_head_lock_3",
+  "bcache_head_lock_4",
+  "bcache_head_lock_5",
+  "bcache_head_lock_6",
+  "bcache_head_lock_7",
+  "bcache_head_lock_8",
+  "bcache_head_lock_9",
+  "bcache_head_lock_10",
+  "bcache_head_lock_11",
+  "bcache_head_lock_12",
+};
+
 void
 binit(void)
 {
   struct buf *b;
 
-  initlock(&bcache.lock, "bcache");
-
   // Create linked list of buffers
-  bcache.head.prev = &bcache.head;
-  bcache.head.next = &bcache.head;
-  for(b = bcache.buf; b < bcache.buf+NBUF; b++){
-    b->next = bcache.head.next;
-    b->prev = &bcache.head;
-    initsleeplock(&b->lock, "buffer");
-    bcache.head.next->prev = b;
-    bcache.head.next = b;
+  for (int i = 0; i < NBUCKET; i++) {
+    initlock(&bcache.head_lock[i], head_lock_name[i]);
+    bcache.head[i].prev = &bcache.head[i];
+    bcache.head[i].next = &bcache.head[i];
+  }
+  for (int i = 0; i < NBUF; i++) {
+    uint hash_idx = i % NBUCKET;
+    b = &bcache.buf[i];
+
+    b->next = bcache.head[hash_idx].next;
+    b->prev = &bcache.head[hash_idx];
+    b->last_use = ticks;
+    initsleeplock(&bcache.buf[i].lock, "buffer");
+    bcache.head[hash_idx].next->prev = b;
+    bcache.head[hash_idx].next = b;
   }
 }
 
@@ -59,32 +84,67 @@ static struct buf*
 bget(uint dev, uint blockno)
 {
   struct buf *b;
+  uint min_last_use;
+
+  uint hash_idx = blockno % NBUCKET;
 
-  acquire(&bcache.lock);
+  acquire(&bcache.head_lock[hash_idx]);
 
   // Is the block already cached?
-  for(b = bcache.head.next; b != &bcache.head; b = b->next){
+  for(b = bcache.head[hash_idx].next; b != &bcache.head[hash_idx]; b = b->next){
     if(b->dev == dev && b->blockno == blockno){
       b->refcnt++;
-      release(&bcache.lock);
+      b->last_use = ticks;
+      release(&bcache.head_lock[hash_idx]);
       acquiresleep(&b->lock);
       return b;
     }
   }
 
   // Not cached.
-  // Recycle the least recently used (LRU) unused buffer.
-  for(b = bcache.head.prev; b != &bcache.head; b = b->prev){
-    if(b->refcnt == 0) {
+  // Recycle the least recently used unused buffer.
+  min_last_use = __UINT32_MAX__;
+  b = 0;
+  for (int i = 0; i < NBUF; i++) {
+    if (bcache.buf[i].refcnt == 0 && min_last_use > bcache.buf[i].last_use) {
+      min_last_use = bcache.buf[i].last_use;
+      b = &bcache.buf[i];
+    }
+  }
+
+  if (b != 0) {
+    // Find an available buffer
+    uint no = b->blockno;
+    if (no % NBUCKET == hash_idx) {
       b->dev = dev;
       b->blockno = blockno;
       b->valid = 0;
       b->refcnt = 1;
-      release(&bcache.lock);
+      b->last_use = ticks;
+      release(&bcache.head_lock[hash_idx]);
       acquiresleep(&b->lock);
       return b;
     }
+    acquire(&bcache.head_lock[no % NBUCKET]);
+    b->dev = dev;
+    b->blockno = blockno;
+    b->valid = 0;
+    b->refcnt = 1;
+    b->last_use = ticks;
+    // Steal!
+    b->next->prev = b->prev;
+    b->prev->next = b->next;
+    release(&bcache.head_lock[no % NBUCKET]);
+
+    b->next = bcache.head[hash_idx].next;
+    b->prev = &bcache.head[hash_idx];
+    bcache.head[hash_idx].next->prev = b;
+    bcache.head[hash_idx].next = b;
+    release(&bcache.head_lock[hash_idx]);
+    acquiresleep(&b->lock);
+    return b;
   }
+
   panic("bget: no buffers");
 }
 
@@ -121,33 +181,29 @@ brelse(struct buf *b)
 
   releasesleep(&b->lock);
 
-  acquire(&bcache.lock);
+  uint hash_idx = b->blockno % NBUCKET;
+  acquire(&bcache.head_lock[hash_idx]);
   b->refcnt--;
   if (b->refcnt == 0) {
-    // no one is waiting for it.
-    b->next->prev = b->prev;
-    b->prev->next = b->next;
-    b->next = bcache.head.next;
-    b->prev = &bcache.head;
-    bcache.head.next->prev = b;
-    bcache.head.next = b;
+    b->last_use = ticks;
   }
-  
-  release(&bcache.lock);
+  release(&bcache.head_lock[hash_idx]);
 }
 
 void
 bpin(struct buf *b) {
-  acquire(&bcache.lock);
+  uint hash_idx = b->blockno % NBUCKET;
+  acquire(&bcache.head_lock[hash_idx]);
   b->refcnt++;
-  release(&bcache.lock);
+  release(&bcache.head_lock[hash_idx]);
 }
 
 void
 bunpin(struct buf *b) {
-  acquire(&bcache.lock);
+  uint hash_idx = b->blockno % NBUCKET;
+  acquire(&bcache.head_lock[hash_idx]);
   b->refcnt--;
-  release(&bcache.lock);
+  release(&bcache.head_lock[hash_idx]);
 }
 
 
diff --git a/kernel/buf.h b/kernel/buf.h
index 4616e9e..635ec1e 100644
--- a/kernel/buf.h
+++ b/kernel/buf.h
@@ -5,6 +5,7 @@ struct buf {
   uint blockno;
   struct sleeplock lock;
   uint refcnt;
+  uint last_use;
   struct buf *prev; // LRU cache list
   struct buf *next;
   uchar data[BSIZE];

其他

这次的实验在最后 make grade 的时候又出了问题:kalloctest 超时,然后 usertests 中的 textwrite 一直过不了。前面这个问题我一开始是在 kalloc 中一次只偷 1 页的内容,后来改成了一次偷 64 页,然后还是超时了,在网上搜了搜其他人的实现1,原封不动抄过来,还是超时 :weary:

textwrites 的那个测试一直过不了,下面是这个测试的部分内容:

1
2
3
4
5
6
7
8
  ...
  pid = fork();
  if(pid == 0) {
    volatile int *addr = (int *) 0;
    *addr = 10;
    exit(1);
  } else if(pid < 0){
  ...

这段测试用于验证操作系统内核是否正确实现了对程序试图向只读(通常是文本段,存放代码)内存区域写入数据的行为的保护。这种保护通常通过引发一个段错误(segmentation fault)来实现,这是大多数现代操作系统为了防止程序破坏自身或其他程序的一种安全机制。可是我没有通过。就很烦,转到其他分支进行测试,居然都没通过 :joy:(在做 COW 的时候就发现了),暂时还没想好咋该 太菜了呜呜呜,之后再看吧。


本文由作者按照 CC BY 4.0 进行授权