一) 線程鎖
1) 只能用於"鎖"住臨界代碼區域
2) 一個線程加的鎖必須由該線程解鎖.
鎖幾乎是我們學習同步時最開始接觸到的一個策略,也是最簡單, 最直白的策略.
二) 條件變量,與鎖不同, 條件變量用於等待某個條件被觸發
1) 大體使用的偽碼:
// 線程一代碼
pthread_mutex_lock(&mutex);
// 設置條件為true
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
// 線程二代碼
pthread_mutex_lock(&mutex);
while (條件為false)
pthread_cond_wait(&cond, &mutex);
修改該條件
pthread_mutex_unlock(&mutex);
需要注意幾點:
1) 第二段代碼之所以在pthread_cond_wait外面包含一個while循環不停測試條件是否成立的原因是, 在pthread_cond_wait被喚醒的時候可能該條件已經不成立.UNPV2對這個的描述是:"Notice that when pthread_cond_wait returns, we always test the condition again, because spurious wakeups can occur: a wakeup when the desired condition is still not true.".
2) pthread_cond_wait調用必須和某一個mutex一起調用, 這個mutex是在外部進行加鎖的mutex, 在調用pthread_cond_wait時, 內部的實現將首先將這個mutex解鎖, 然後等待條件變量被喚醒, 如果沒有被喚醒, 該線程將一直休眠, 也就是說, 該線程將一直阻塞在這個pthread_cond_wait調用中, 而當此線程被喚醒時, 將自動將這個mutex加鎖.
man文檔中對這部分的說明是:
pthread_cond_wait atomically unlocks the mutex (as per pthread_unlock_mutex) and waits for the condition variable cond to be signaled. The thread execution is suspended and does not consume any CPU time until the condition variable is
signaled. The mutex must be locked by the calling thread on entrance to pthread_cond_wait. Before returning to the calling thread, pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
也就是說pthread_cond_wait實際上可以看作是以下幾個動作的合體:
解鎖線程鎖
等待條件為true
加鎖線程鎖.
這裡是使用條件變量的經典例子:
http://www.cppblog.com/CppExplore/archive/2008/03/20/44949.html
之所以使用兩個條件變量, 是因為有兩種情況需要進行保護,使用數組實現循環隊列,因此一個條件是在getq函數中判斷讀寫指針相同且可讀數據計數為0,此時隊列為空沒有數據可讀,因此獲取新數據的條件變量就一直等待,另一個條件是讀寫指針相同且可讀數據計數大於0,此時隊列滿了不能再添加數據, 因此添加新數據的條件變量就一直等待,而nEmptyThreadNum和nFullThreadNum則是計數, 只有這個計數大於0時才會喚醒相應的條件變量,這樣可以減少調用pthread_cond_signal的次數.
為了在下面的敘述方便, 我將這段代碼整理在下面, 是一個可以編譯運行的代碼,但是注意需要在編譯時加上-pthread鏈接線程庫:
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
class CThreadQueue
{
public:
CThreadQueue(int queueSize=1024):
sizeQueue(queueSize),lput(0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
{
pthread_mutex_init(&mux,0);
pthread_cond_init(&condGet,0);
pthread_cond_init(&condPut,0);
buffer=new void *[sizeQueue];
}
virtual ~CThreadQueue()
{
delete[] buffer;
}
void * getq()
{
void *data;
pthread_mutex_lock(&mux);
/*
此處循環判斷的原因如下:假設2個線程在getq阻塞,然後兩者都被激活,而其中一個線程運行比較塊,快速消耗了2個數據,另一個線程醒來的時候已經沒有新數據可以消耗了。另一點,man pthread_cond_wait可以看到,該函數可以被信號中斷返回,此時返回EINTR。為避免以上任何一點,都必須醒來後再次判斷睡眠條件。更正:pthread_cond_wait是信號安全的系統調用,不會被信號中斷。
*/
while(lget==lput&&nData==0)
{
nEmptyThread++;
pthread_cond_wait(&condGet,&mux);
nEmptyThread--;
}
data=buffer[lget++];
nData--;
if(lget==sizeQueue)
{
lget=0;
}
if(nFullThread) //必要時才進行signal操作,勿總是signal
{
pthread_cond_signal(&condPut);
}
pthread_mutex_unlock(&mux);
return data;
}
void putq(void *data)
{
pthread_mutex_lock(&mux);
while(lput==lget&&nData)
{
nFullThread++;
pthread_cond_wait(&condPut,&mux);
nFullThread--;
}
buffer[lput++]=data;
nData++;
if(lput==sizeQueue)
{
lput=0;
}
if(nEmptyThread) //必要時才進行signal操作,勿總是signal
{
pthread_cond_signal(&condGet);
}
pthread_mutex_unlock(&mux);
}
private:
pthread_mutex_t mux;
pthread_cond_t condGet;
pthread_cond_t condPut;
void * * buffer; //循環消息隊列
int sizeQueue; //隊列大小
int lput; //location put 放數據的指針偏移
int lget; //location get 取數據的指針偏移
int nFullThread; //隊列滿,阻塞在putq處的線程數
int nEmptyThread; //隊列空,阻塞在getq處的線程數
int nData; //隊列中的消息個數,主要用來判斷隊列空還是滿
};
CThreadQueue queue;//使用的時候給出稍大的CThreadQueue初始化參數,可以減少進入內核態的操作。
void * produce(void * arg)
{
int i=0;
pthread_detach(pthread_self());
while(i++<100)
{
queue.putq((void *)i);
}
}
void *consume(void *arg)
{
int data;
while(1)
{
data=(int)(queue.getq());
printf("data=%d\n",data);
}
}
int main()
{
pthread_t pid;
int i=0;
while(i++<3)
pthread_create(&pid,0,produce,0);
i=0;
while(i++<3)
pthread_create(&pid,0,consume,0);
sleep(5);
return 0;
}
三) 信號量
信號量既可以作為二值計數器(即0,1),也可以作為資源計數器.
主要是兩個函數:
sem_wait() decrements (locks) the semaphore pointed to by sem. If the semaphore's value is greater than zero, then
the decrement proceeds, and the function returns, immediately. If the semaphore currently has the value zero, then
the call blocks until either it becomes possible to perform the decrement (i.e., the semaphore value rises above
zero), or a signal handler interrupts the call.
sem_post() increments (unlocks) the semaphore pointed to by sem. If the semaphore's value consequently becomes
greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed to lock
the semaphore.
而函數int sem_getvalue(sem_t *sem, int *sval);則用於獲取信號量當前的計數.
可以用信號量模擬鎖和條件變量:
1) 鎖,在同一個線程內同時對某個信號量先調用sem_wait再調用sem_post, 兩個函數調用其中的區域就是所要保護的臨界區代碼了,這個時候其實信號量是作為二值計數器來使用的.不過在此之前要初始化該信號量計數為1,見下面例子中的代碼.
2) 條件變量,在某個線程中調用sem_wait, 而在另一個線程中調用sem_post.
我們將上面例子中的線程鎖和條件變量都改成用信號量實現以說明信號量如何模擬兩者:
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <errno.h>
#include <string.h>
class CThreadQueue
{
public:
CThreadQueue(int queueSize=1024):
sizeQueue(queueSize),lput(0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
{
//pthread_mutex_init(&mux,0);
mux = sem_open("mutex", O_RDWR | O_CREAT);
get = sem_open("get", O_RDWR | O_CREAT);
put = sem_open("put", O_RDWR | O_CREAT);
sem_init(mux, 0, 1);
buffer=new void *[sizeQueue];
}
virtual ~CThreadQueue()
{
delete[] buffer;
sem_unlink("mutex");
sem_unlink("get");
sem_unlink("put");
}
void * getq()
{
void *data;
//pthread_mutex_lock(&mux);
sem_wait(mux);
while(lget==lput&&nData==0)
{
nEmptyThread++;
//pthread_cond_wait(&condGet,&mux);
sem_wait(get);
nEmptyThread--;
}
data=buffer[lget++];
nData--;
if(lget==sizeQueue)
{
lget=0;
}
if(nFullThread) //必要時才進行signal操作,勿總是signal
{
//pthread_cond_signal(&condPut);
sem_post(put);
}
//pthread_mutex_unlock(&mux);
sem_post(mux);
return data;
}
void putq(void *data)
{
//pthread_mutex_lock(&mux);
sem_wait(mux);
while(lput==lget&&nData)
{
nFullThread++;
//pthread_cond_wait(&condPut,&mux);
sem_wait(put);
nFullThread--;
}
buffer[lput++]=data;
nData++;
if(lput==sizeQueue)
{
lput=0;
}
if(nEmptyThread)
{
//pthread_cond_signal(&condGet);
sem_post(get);
}
//pthread_mutex_unlock(&mux);
sem_post(mux);
}
private:
//pthread_mutex_t mux;
sem_t* mux;
//pthread_cond_t condGet;
//pthread_cond_t condPut;
sem_t* get;
sem_t* put;
void * * buffer; //循環消息隊列
int sizeQueue; //隊列大小
int lput; //location put 放數據的指針偏移
int lget; //location get 取數據的指針偏移
int nFullThread; //隊列滿,阻塞在putq處的線程數
int nEmptyThread; //隊列空,阻塞在getq處的線程數
int nData; //隊列中的消息個數,主要用來判斷隊列空還是滿
};
CThreadQueue queue;//使用的時候給出稍大的CThreadQueue初始化參數,可以減少進入內核態的操作。
void * produce(void * arg)
{
int i=0;
pthread_detach(pthread_self());
while(i++<100)
{
queue.putq((void *)i);
}
}
void *consume(void *arg)
{
int data;
while(1)
{
data=(int)(queue.getq());
printf("data=%d\n",data);
}
}
int main()
{
pthread_t pid;
int i=0;
while(i++<3)
pthread_create(&pid,0,produce,0);
i=0;
while(i++<3)
pthread_create(&pid,0,consume,0);
sleep(5);
return 0;
}