0%

并行计算-并行原语

一、总论

在并行编程中,除了“互斥”这个原语外(一般是采用“锁”来实现),还存在“同步(synchronous)”这个原语,这个原语指的是“各个事件按照特定顺序执行”的需求。比如说在生产者消费者模型中,必须要让生产者先生产,消费者才可以进行消费,这就是一种同步行为,是没有办法仅仅依靠锁来实现的。

当涉及互斥时,涉及到的术语是 lock, unlock, mutex ,而涉及到同步是术语是 wait, signal, notify, barrier 等。之前我只重视了互斥的学习,而对于同步语义,则非常忽视。


二、互斥

上锁是维护互斥性的方式,只有拿到锁的线程,可以访问关键区域。

2.1 自旋锁

自旋锁(Spinlock)是最为朴素的锁,它的形式是这样的:

while (!condition); // test lock
condition = false; // lock

// do some critical thing...

condition = true; // unlock

只要不满足 condition,那么就会一直停留在 while 语句中执行。所谓的“自旋”,指的就是“自我重复循环”的意思。

2.2 互斥锁

互斥锁(mutex,mutual exclusive)的形式如下:

while (!condition) yield(); // test lock
condition = false; // lock

// do some critical thing...

condition = true; // unlock

一旦条件检验不通过,自旋锁会立刻进行下一次条件检验,而互斥锁则会 yield,将执行权让给其他线程。自旋锁的设计,是在自旋的过程中依然占据资源,其优势在于并不会引入额外的上下文切换开销,当自旋时间不多的时候(也就是关键区比较短),那么用自旋锁比较合适。而如果关键区比较长,切换上下文开销小于资源占用的开销,那么就用互斥锁更为合适。

2.3 CAS

上述的锁的实现都是“伪代码”,如果是 C 代码的话,是会存在问题的。考虑这样一个情形,此时有 A 和 B 都在等待 condition 发生变化,一旦 condition 变成 true,A 从 while 中离开,然后在“ A 设置 condition = false 来阻止其他的线程访问”这个时刻之前,突然调度器将 A 挂起,将 B 执行,此时 conditon 已然是 true,所以 B 也可以进入关键区,此时有两个线程在关键区,那么就发生了错误。

从上述描述中可以看出,发生错误的核心在于“检测 condition”和“设置 condition”这两个行为是没有绑定在一起的,一旦上下文切换发生在二者之间,就会导致发生错误。我们可以在硬件层面上将这两个行为统一成一个“原子行为”,一个常见的实现就是 CAS(Compare And Swap),其伪代码如下:

int CompareAndSwap(int *ptr, int expected, int new) {
  int old = *ptr;
  if(old == expected) {
    *ptr = new;
  }
  return old;
}

可以看到这个行为基本上就是检验旧值是否符合预期,如果符合预期就将旧值修改为新值,否则保持不变。此时就可以轻易实现一个自旋锁:

void lock(lock_t *lock) {
  while(CompareAndSwap(&lock->condition, true, false) == false);
}

void unlock(lock_t *lock) {
  lock->condition = true;
}

突然想起来,lock/unlock 之间的关键区,也可以看作是原子性的,没有任何其他的指令可以插手这个区域。锁机制可以看作是对 CAS 原子性的一种拓展,原先只有一条指令是原子性的,现在是一个区域内的所有指令都是原子性的了。

我们设计关键区的时候,除了“共享”外,还可以从“不被打断”的角度去设计。


三、同步

3.1 条件变量

可以看到上述描述的互斥原语和锁机制,往往是发生在身份相同的线程中的,它们往往执行同一段代码,履行相同的职责,所以需要互斥机制来保证独立性。不过线程并不只有“竞争者”这一个身份,还有可能是”协作者“,此时他们就需要”同步“机制来确保他们的协同性。

同步机制比较简单的一个版本就是“先 21”,其中 12 是两个不同的线程,如果仅有锁机制,我们没法保证执行的顺序问题。但是如果当 1 先执行的时候,让它先 wait 一下 2 ,直到 2 执行完了以后 signal 提醒一下它,它再加入就绪队列。其中 1 的“等待室”,就被称为条件变量(Condition Variable)。

基于这种思想,我们有了第一版代码:

void *f1(void *arg)
{
	pthread_cond_wait(&cond, &mutex);
	printf("I am f1\n");
}

void *f2(void *arg)
{
	printf("I am f2\n");
	pthread_cond_signal(&cond)}

需要注意此时的 cond_wait 和 mutex 的 yield 不同,yield 是将线程加入到 OS 的待调度队列中,如果队列中没有其他线程了,那么等待锁的线程还是会反复被调用,而 cond_wait 是一个独立的等待队列,如果不被 signal/notify ,那么就永远不会被调度。这就导致在同步的应用中,常常出现有些线程在等待室里“醒不过来”的情况。

然后我们就会发现一个问题,就是如果 2 先执行完成呢?那么 1 就会一直 wait 下去,因为 signal 信号早就执行完了,这个问题被称为“唤醒丢失”。让我们再回顾需求,会发现其实 1 只有在 2 还没有执行的时候,才需要 wait ,如果本来就是 2 先执行,那么就无须 wait 了,我们可以用一个共享变量来指示是否线程 1 已经执行完了,于是我们得到了第二版:

void *f1(void *arg)
{
	if (condition == 0) {
		pthread_cond_wait(&cond, &mutex);
	}

	printf("I am f1\n");
}

void *f2(void *arg)
{
	printf("I am f2\n");
	condition = 1;
	pthread_cond_signal(&cond);
}

但是这就有一个问题,就是 condition 作为共享变量,有可能出现“共同访问”的问题,比如先调度 1 时完成了对于 condition == 0 的检查,然后切换到了 2,并完成了 signal,然后再切换到 1 来执行 wait,那么就收不到信号了。那么此时就需要用锁来实现原子性,要保证对 condition 的访问和 wait/signal 是绑定在一起的,于是我们得到了第三版:

pthread_cond_t cond;
pthread_mutex_t mutex;
int condition = 0;

void *f1(void *arg)
{
	pthread_mutex_lock(&mutex);
	
	if (condition == 0) {
		pthread_cond_wait(&cond, &mutex);
	}
	
	pthread_mutex_unlock(&mutex);

	printf("I am f1\n");
}

void *f2(void *arg)
{
	printf("I am f2\n");
	
	pthread_mutex_lock(&mutex);
	
	condition = 1;
	pthread_cond_signal(&cond);

	pthread_mutex_unlock(&mutex);
}

由此可见即使在原本的实际中并不涉及共享内存(12 都只打印信息,并不访问共享内存),仅是处于同步的需求,条件变量也必须要和锁紧密配合。这就是在 POSIX 的条件变量的 API 中有锁的原因。

上述简单的例子已经结束了,但是条件变量依然还存在一个问题,也就是“虚假唤醒”。这个问题涉及到了条件变量的实现机制,线程在 wait 的时候,会将锁释放掉,当被其他线程 signal 的时候,它不是立刻执行,而是被放到了 OS 的待调度队列中,当调度到他的时候,他要再去拿锁然后执行,如果没有拿到锁,那么还会被重新插入到 OS 的待调度队列中等待(相当于隐式 yield 了)。

这种机制就存在一个问题了,就是在线程被 signal 唤醒后插入到待调度队列中,到真正拿到锁开始执行的这段时间内,有可能有其他线程修改了 condition 这样的条件,致使从语义上来说,线程应该继续 wait,而因为 if 的原因,使得线程逃离 wait,开始执行。所以我们最后要做的修改就是将 if 改成 while,如下所示:

void *f1(void *arg)
{
	pthread_mutex_lock(&mutex);
	while (condition == 0) {
		pthread_cond_wait(&cond, &mutex);
	}
	pthread_mutex_unlock(&mutex);
	printf("I am f1\n");
}

从这个意义上来看,所谓的“signal 唤醒”,对于被唤醒的进程,并不能保证现在的唤醒符合“可执行”的条件了,而只是“可能可执行”,需要进一步的检查。

3.2 信号量

不可否认,条件变量是丑陋的,他必须配合锁,而且还要搭配 while 才能使用。究其根本,是因为它具有“唤醒丢失”和“虚假唤醒”的问题。

“唤醒丢失”是因为条件变量本身不具有状态导致的,必须用一个共享变量来指导同步,如果我们将一个共享变量封装到其中呢?那我们是不是就不用显式的面对锁的问题了。而反正我们都将共享变量封装到其中了,与之相关的控制语句 while 也封装进去,也不是什么困难的事情了。于是我们就得到了信号量(Semaphore)。

信号量的本质是一个计数器 counter,它具有 Pwait)和 Vpost)两种操作,P 会减少 counter 的值,如果 counter == 0,那么就会让发起 P 操作的线程进入“等待室”;V 会增加 counter 的值,如果 counter > 0,那么就会唤醒等待室中的线程。

可以看到基本上信号量就是将原本的 condition 转换成了 counter 并封装进了自身内部。其实现如下:

sem_wait(s) {
        while(CompareAndSwap(s.flag, 0, 1) == 1);
        s.count--;
        if(s.count < 0) {
                /* 该线程进入 s.queue 队列 */
                /* 阻塞该线程(还需将 s.flag 设置为 0,阻塞和赋值需要是一个原子操作) */
        }
        s.flag = 0;
}

sem_post(s) {
        while(CompareAndSwap(s.flag, 0, 1) == 1);
        s.count++;
        if(s.count <= 0) {
                /* 从s.queue 队列中移出线程 p */
                /* 线程 p 进入就绪队列 */
        }
        s.flag = 0;
}

信号量是 System V 提出的机制,它足够强大,以至于仅凭这一个机制,就可以实现互斥和同步两种原语。

当信号量初始值为 1 的时候,他可以用作互斥,如下所示:

sem_t m;
sem_init(&m, 0, 1);

sem_wait();
// 临界区
sem_post();

而当信号量初始值为 0 时,可以用于同步,这是因为此时就必须先 postwait 才可以避免阻塞:

void *f1(void *arg)
{
	sem_wait();
	printf("I am f1\n");
}

void *f2(void *arg)
{
	printf("I am f2\n");
	sem_post();
}

可以看到信号量非常简洁美妙。


四、生产者消费者模型

生产者消费者模型是非常经典的并行原语的应用。其中因为生产者和消费者会同时访问同一片共享区域,需要使用互斥机制;又因为当消费品为零或者满的时候,需要使消费者或者生产者 wait ,当条件满足时,又需要 signal 生产者和消费者,所以需要同步机制。

并不是所有的共享同一片内存,具有读写两方的模型都被叫作生产者消费者模型,生产者和消费者之间必须有同步机制来提高效率,这样才能叫作生产者消费者机制。同步机制是非常重要的,考虑生产者的效率是消费者的十倍,那么生产者必然很快就会填满缓冲区,随后生产者会重复拿锁进入缓冲区,并不干任何事情就离开,而消费者不容易拿到锁,机会被大量挤占,导致消费能力进一步降低。而有了同步机制,当产品填满缓冲区时,生产者就可以 wait 等待消费者唤醒了,这样更加高效。

生产者消费者模型有“阻塞队列”和“循环队列”两种形式,分别对应条件变量和信号量实现。循环队列的方式更优,因为阻塞队列是用一把锁保护整个共享区域,而实际上只要不是同时对于同一个商品读写,那么其实共享区域是允许同时存在一个生产者和一个消费者的,循环队列配合信号量可以很容易解决这个问题。

更加具体的描述和实现,可以在这里找到。