之前的一片博客介紹了用於Linux內核同步的自旋鎖,即使用自旋鎖來保護共享資源,今天介紹另外一種Linux內核同步機制——信號量。信號量在內核中的使用非常廣泛,用於對各種共享資源的保護。信號量與自旋鎖的實現機制是不一樣的,用處也是不一樣的。首先,自旋鎖和信號量都使用了計數器來表示允許同時訪問共享資源的最大進程數,但自旋鎖的共享計數值是1,也就是說任意時刻只有一個進程在共享代碼區運行;信號量卻允許使用大於1的共享計數,即共享資源允許被多個不同的進程同時訪問,當然,信號量的計數器也能設為1,這時信號量也稱為互斥量。其次,自旋鎖用於保護短時間能夠完成操作的共享資源,使用期間不允許進程睡眠和進程切換;信號量常用於暫時無法獲取的共享資源,如果獲取失敗則進程進入不可中斷的睡眠狀態,只能由釋放資源的進程來喚醒。最後,自旋鎖可以用於中斷服務程序之中;信號量不能在中斷服務程序中使用,因為中斷服務程序是不允許進程睡眠的。關於信號量的基本知識已經講解完畢,接下來看看信號量在內核裡面的實現,本文講解的內核版本是linux-2.6.24。
1 數據結構
struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; };
信號量使用的數據結構是struct semaphore,包含三個數據成員:count是共享計數值、sleepers是等待當前信號量進入睡眠的進程個數、wait是當前信號量的等待隊列。
2 信號量使用
使用信號量之前要進行初始化,其實只是簡單的設置共享計數和等待隊列,睡眠進程數一開始是0。本文重點講解信號量的使用和實現。信號量操作的API:
static inline void down(struct semaphore * sem)//獲取信號量,獲取失敗則進入睡眠狀態 static inline void up(struct semaphore * sem)//釋放信號量,並喚醒等待隊列中的第一個進程
信號量的使用方式如下:
down(sem); ...臨界區... up(sem);
內核保證正在訪問臨界區的進程數小於或等於初始化的共享計數值,獲取信號量失敗的進程將進入不可中斷的睡眠狀態,在信號量的等待隊列中進行等待。當進程釋放信號量的時候就會喚醒等待隊列中的第一個進程。
3 信號量的實現
3.1 down(sem)
首先看函數的定義:
static inline void down(struct semaphore * sem) { might_sleep(); __asm__ __volatile__( "# atomic down operation\n\t" LOCK_PREFIX "decl %0\n\t" /* --sem->count */ "jns 2f\n" "\tlea %0,%%eax\n\t" "call __down_failed\n" "2:" :"+m" (sem->count) : :"memory","ax"); }
這裡面包含了一些匯編代碼,%0代表sem->count。也就是說先將sem->count減1,LOCK_PREFIX表示執行這條指令時將總線鎖住,保證減1操作是原子的。減1之後如果大於或等於0就轉到標號2處執行,也就跳過了__down_failed函數直接到函數尾部並返回,成功獲取信號量;否則減1之後sem->count小於0則順序執行後面的__down_failed函數。接下來看__down_failed函數的定義:
ENTRY(__down_failed) CFI_STARTPROC FRAME pushl %edx CFI_ADJUST_CFA_OFFSET 4 CFI_REL_OFFSET edx,0 pushl %ecx CFI_ADJUST_CFA_OFFSET 4 CFI_REL_OFFSET ecx,0 call __down popl %ecx CFI_ADJUST_CFA_OFFSET -4 CFI_RESTORE ecx popl %edx CFI_ADJUST_CFA_OFFSET -4 CFI_RESTORE edx ENDFRAME ret CFI_ENDPROC END(__down_failed)
pushl和popl是用於保存和恢復寄存器的,CFI前綴的指令用於指令對齊調整。重點在函數__down,下面來看該函數的定義:
fastcall void __sched __down(struct semaphore * sem) { struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); unsigned long flags; tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irqsave(&sem->wait.lock, flags); add_wait_queue_exclusive_locked(&sem->wait, &wait); sem->sleepers++; for (;;) { int sleepers = sem->sleepers; /* * Add "everybody else" into it. They aren't * playing, because we own the spinlock in * the wait_queue_head. */ if (!atomic_add_negative(sleepers - 1, &sem->count)) { sem->sleepers = 0; break; } sem->sleepers = 1; /* us - see -1 above */ spin_unlock_irqrestore(&sem->wait.lock, flags); schedule(); spin_lock_irqsave(&sem->wait.lock, flags); tsk->state = TASK_UNINTERRUPTIBLE; } remove_wait_queue_locked(&sem->wait, &wait); wake_up_locked(&sem->wait); spin_unlock_irqrestore(&sem->wait.lock, flags); tsk->state = TASK_RUNNING; }