線程池:簡單地說,線程池 就是預先創建好一批線程,方便、快速地處理收到的業務。比起傳統的到來一個任務,即時創建一個線程來處理,節省了線程的創建和回收的開銷,響應更快,效率更高。
在linux中,使用的是posix線程庫,首先介紹幾個常用的函數:
1 線程的創建和取消函數
pthread_create
創建線程
pthread_join
合並線程
pthread_cancel
取消線程
2 線程同步函數
pthread_mutex_lock
pthread_mutex_unlock
pthread_cond_signal
pthread_cond_wait
關於函數的詳細說明,參考man手冊
線程池的實現:
線程池的實現主要分為三部分,線程的創建、添加任務到線程池中、工作線程從任務隊列中取出任務進行處理。
主要有兩個類來實現,CTask,CThreadPool
/**
執行任務的類,設置任務數據並執行
**/
C代碼
class CTask
{
protected:
string m_strTaskName; //任務的名稱
void* m_ptrData; //要執行的任務的具體數據
public:
CTask(){}
CTask(string taskName)
{
this->m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); //設置任務數據
};
任務類是個虛類,所有的任務要從CTask類中繼承 ,實現run接口,run接口中需要實現的就是具體解析任務的邏輯。m_ptrData是指向任務數據的指針,可以是簡單數據類型,也可以是自定義的復雜數據類型。
線程池類
/**
線程池
**/
Java代碼
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList; //任務列表
int m_iThreadNum; //線程池中啟動的線程數
static vector<pthread_t> m_vecIdleThread; //當前空閒的線程集合
static vector<pthread_t> m_vecBusyThread; //當前正在執行的線程集合
static pthread_mutex_t m_pthreadMutex; //線程同步鎖
static pthread_cond_t m_pthreadCond; //線程同步的條件變量
protected:
static void* ThreadFunc(void * threadData); //新線程的線程函數
static int MoveToIdle(pthread_t tid); //線程執行結束後,把自己放入到空閒線程中
static int MoveToBusy(pthread_t tid); //移入到忙碌線程中去
int Create(); //創建所有的線程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任務添加到線程池中
int StopAll();
};
當線程池對象創建後,啟動一批線程,並把所有的線程放入空閒列表中,當有任務到達時,某一個線程取出任務並進行處理。
線程之間的同步用線程鎖和條件變量。
這個類的對外接口有兩個:
AddTask函數把任務添加到線程池的任務列表中,並通知線程進行處理。當任務到到時,把任務放入m_vecTaskList任務列表中,並用pthread_cond_signal喚醒一個線程進行處理。
StopAll函數停止所有的線程
Cpp代碼
************************************************
代碼:
××××××××××××××××××××CThread.h
#ifndef __CTHREAD
#define __CTHREAD
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
/**
執行任務的類,設置任務數據並執行
**/
class CTask
{
protected:
string m_strTaskName; //任務的名稱
void* m_ptrData; //要執行的任務的具體數據
public:
CTask(){}
CTask(string taskName)
{
this->m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); //設置任務數據
};
/**
線程池
**/
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList; //任務列表
int m_iThreadNum; //線程池中啟動的線程數
static vector<pthread_t> m_vecIdleThread; //當前空閒的線程集合
static vector<pthread_t> m_vecBusyThread; //當前正在執行的線程集合
static pthread_mutex_t m_pthreadMutex; //線程同步鎖
static pthread_cond_t m_pthreadCond; //線程同步的條件變量
protected:
static void* ThreadFunc(void * threadData); //新線程的線程函數
static int MoveToIdle(pthread_t tid); //線程執行結束後,把自己放入到空閒線程中
static int MoveToBusy(pthread_t tid); //移入到忙碌線程中去
int Create(); //創建所有的線程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任務添加到線程池中