glibc读写锁的原理

加读锁:
int __pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
(nptl/pthread_rwlock_rdlock.c)

加写锁:
int __pthread_rwlock_wrlock(pthread_rwlock_t* rwlock)
(nptl/pthread_rwlock_wrlock.c)

解锁:
int __pthread_rwlock_unlock(pthread_rwlock_t* rwlock)
(nptl/pthread_rwlock_wrlock.c)

默认读写锁是读优先的,如果有写者在等待锁,不会阻塞读者。
如果配置为写优先,如果有写者在等待锁,所有的后续的读者加锁都会阻塞。
但是当占有读者锁的读者数为0时,当有写者在等待时,会优先唤醒写者。
当配置为读者优先时,会出现写者饿死的情况
当配置为写者优先时,会出现读者饿死的情况
读写锁适用于读多写少的场景

加读锁源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  int __pthread_rwlock_rdlock(pthread_rwlock_t* rwlock)
{
...
//进入临界区
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
while (1)
{
if (rwlock->__data.__writer == 0
&& (!rwlock->__data.__nr_writers_queued || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
{//当写者数为0,或者有写者在等待锁,但是配置的是读优先,则可以抢占,否则阻塞读者
if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0))
{//读者数+1,当超过上限溢出时,则返回EAGAIN
--rwlock->__data.__nr_readers;
result = EAGAIN;
break;
}
else
LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
break;
}
....
if (__buildtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0)) //排队读者数+1
{//读者等待数+1,当溢出时,则返回EAGAIN
--rwlock->__data.__nr_readers_queued;
result = EAGAIN;
break;
}
....
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
lll_futex_wait(&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.shared); //等待唤醒
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
--rwlock->__data.__nr_readers_queued; //排队读者数-1
}//end while
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
}

加写锁源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int __pthread_rwlock_wrlock (pthread_rwlock_t* rwlock)
{
.....
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
while (1)
{
if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
{//写者数为0,读者数为0,加写锁成功
LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
break;
}
.....
if (++rwlock->__data.__nr_writers_queued == 0)
{//写者等待数+1,当溢出时,则返回EAGAIN
--rwlock->__data.__nr_writers_queued;
result = EAGAIN;
break;
}
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
lll_futex_wait(&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.shared); //等待唤醒
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
--rwlock->__data.__nr_writers_queued; //排队写者数-1
}
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
}

解锁源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  int __pthread_rwlock_unlock (pthread_rwlock_t* rwlock)
{
......
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
if (rwlock->__data.__writer) //写者数不为0
rwlock->__data.__writer = 0;
else //读者数不为0
--rwlock->__data.__nr_readers;

if (rwlock->__data.__nr_readers == 0) //没有读者
{
if (rwlock->__data.__nr_writers_queued) //有写者在等待
{
++rwlock->__data.__writer_wakeup; //唤醒写者
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
lll_futex_wake (&rwlock->__data.__writer_wakeup, 1,
rwlock->__data.__shared);
return 0;
}
else if (rwlock->__data.__nr_readers_queued) //有读者等待
{
++rwlock->__data.__readers_wakeup; //唤醒读者
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX,
rwlock->__data.__shared);
return 0;
}
}
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

}

内核读写自旋锁

linux内核的读写自旋锁也是读优先的。
以2.6.18内核的实现为例

加读锁

1
2
3
4
5
6
int __lockfunc generic__raw_read_trylock(raw_rwlock_t* lock)
|—>__raw_read_lock(lock)
|—>__build_read_lock(rw, "__read_lock_failed");
|—>__build_read_lock_const(rw, "__read_lock_failed")
当lock初始化时,初始化为RW_LOCK_BIAS(0x01000000),所以内核读写锁最多支持001000000个读者)
__build_read_lock_const实现:
1
2
3
4
5
6
7
8
#define __build_read_lock_const(rw, helper) \
asm volatile(LOCK_PREFIX " subl $1,%0\n\t" \ //锁总线,rw-1
"jns 1f\n" \ 如果非负,加读锁成功,跳转到1(因为加写锁时,会直接rw - RW_LOCK_BIAS,所以再减1会变为负数)
"pushl %%eax\n\t" \
"leal %0,%%eax\n\t" \
"call " helper "\n\t" \ 如果为负,加读锁失败,跳转到__read_lock_failed
"popl %%eax\n\t" \
"1:\n" \

__read_lock_failed实现

1
2
3
4
5
6
7
8
9
10
11
12
13
  asm(
".section .sched.text\n"
".align 4\n"
".globl __read_lock_failed\n"
"__read_lock_failed:\n\t"
LOCK_PREFIX "incl (%eax)\n"
"1: rep; nop\n\t"
"cmpl $1,(%eax)\n\t" //循环等待,直到rw变为0x01000000
"js 1b\n\t"
LOCK_PREFIX "decl (%eax)\n\t"
"js __read_lock_failed\n\t"
"ret"
);

解读锁

1
2
3
4
5
6
7
_read_unlock()
|—>__raw_read_unlock()

static inline void __raw_read_unlock(raw_rwlock_t *rw)
{
asm volatile(LOCK_PREFIX "incl %0" :"+m" (rw->lock) : : "memory");
}

加写锁

1
2
3
4
5
6
__write_trylock(rwlock_t* lock)
|—>__raw_write_lock(lock)

|—>__build_write_lock(rw, "__write_lock_failed")

|—>__build_write_lock_const(rw, "__writer_lock_failed");

__build_write_lock_const实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define __build_write_lock_const(rw, helper) \

asm volatile(LOCK_PREFIX " subl $" RW_LOCK_BIAS_STR ",%0\n\t" \ //rw - RW_LOCK_BIAS

"jz 1f\n" \ //等于0,加锁成功

"pushl %%eax\n\t" \

"leal %0,%%eax\n\t" \

"call " helper "\n\t" \ 不成功,跳转__writer_lock_failed

"popl %%eax\n\t" \

"1:\n" \

:"+m" (*(volatile int *)rw) : : "memory")

__write_lock_failed实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
asm(

".section .sched.text\n"

".align 4\n"

".globl __write_lock_failed\n"

"__write_lock_failed:\n\t"

LOCK_PREFIX "addl $" RW_LOCK_BIAS_STR ",(%eax)\n"

"1: rep; nop\n\t"

"cmpl $" RW_LOCK_BIAS_STR ",(%eax)\n\t"

"jne 1b\n\t"

LOCK_PREFIX "subl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" //直到rw-RW_LOCK_BIAS_STR=0,加锁成功,否则循环行等待

"jnz __write_lock_failed\n\t"

"ret"

);

解写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_write_unlock()

|–>__raw_write_unlock(lock)


static inline void __raw_write_unlock(raw_rwlock_t *rw)

{

asm volatile(LOCK_PREFIX "addl $" RW_LOCK_BIAS_STR ", %0"

: "+m" (rw->lock) : : "memory"); //rw + 1

}

参考文章:

1)linux下写优先的读写锁的设计:http://www.ibm.com/developerworks/cn/linux/l-rwlock_writing/

Comments

2014-05-29