1.任务队列

类声明(Task,TaskQueue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//
// Created by keqiu on 2023/7/18.
//

#ifndef C___PRACTICE_TASKQUEUE_H
#define C___PRACTICE_TASKQUEUE_H

#include <queue>
#include <pthread.h>

using function = void(*)(void* arg);//为一个函数指针类型取别名


struct Task{

Task(){
working = nullptr;
arg = nullptr;
}
/*无参构造*/
Task(function working, void* arg){
this->working = working;
this->arg = arg;
}
/*有参构造*/
function working;
void* arg;
};


class TaskQueue{

private:
std::queue<Task> taskQueue;//任务队列,用容器实现
pthread_mutex_t QueueLock;//互斥锁

public:
TaskQueue();

~TaskQueue();

void addTask(Task& task);
/*添加任务*/
void addTask(function working, void* arg);
/*addTask重载函数*/
Task takeTask();
/*取出任务*/
inline size_t getTaskSize(){
return taskQueue.size();
}
/*获取任务队列大小*/

};
#endif //C___PRACTICE_TASKQUEUE_H

其中Task 是任务类,里边有两个成员,分别是两个指针 void(*)(void*)void*

另外一个类TaskQueue是任务队列,提供了添加任务、取出任务、存储任务、获取任务个数、线程同步的功能。

类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//
// Created by keqiu on 2023/7/18.
//

#include "TaskQueue.h"

TaskQueue::TaskQueue()
{
pthread_mutex_init(&QueueLock, nullptr);
}
//构造函数

TaskQueue::~TaskQueue()
{
pthread_mutex_destroy(&QueueLock);
}
//析构函数

void TaskQueue::addTask(Task& task)
{

pthread_mutex_lock(&QueueLock);//此处如果资源已经被释放,就会发生段错误
taskQueue.push(task);
pthread_mutex_unlock(&QueueLock);

}

void TaskQueue::addTask(function working, void *arg)
{
pthread_mutex_lock(&QueueLock);
taskQueue.push(Task(working, arg));
pthread_mutex_unlock(&QueueLock);
}


Task TaskQueue::takeTask()
{
pthread_mutex_lock(&QueueLock);
Task t;
if(!taskQueue.empty())
{
t = taskQueue.front();//取出队列头部元素
taskQueue.pop();//删除队列头部元素
}
pthread_mutex_unlock(&QueueLock);

return t;
}

2.线程池

类声明(ThreadPool)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//
// Created by keqiu on 2023/7/18.
//

#ifndef C___PRACTICE_THREADPOOL_H
#define C___PRACTICE_THREADPOOL_H

#include "TaskQueue.h"


class ThreadPool {

private:
TaskQueue* taskQueue = nullptr;//任务队列
pthread_t managerID;//管理者线程id
pthread_t* threadIDs;
int minNum;//最小线程数
int maxNum;//最大线程数
int busyNum;//忙线程数
int liveNum;//活着的线程数
int exitNum;//要销毁的线程数
pthread_mutex_t threadLock;//锁住整个线程池
pthread_mutex_t busyLock;//单独锁住busyNum,提高效率
pthread_cond_t notEmpty;//任务队列为空时阻塞,不为空时唤醒

bool shutdown;//是否要销毁线程池

static void* worker(void* arg);
//工作线程函数
static void* manager(void* arg);
//管理者线程函数
void threadExit();

public:
ThreadPool(int min,int max);
//构造函数
~ThreadPool();
//析构函数
void addTask(Task task);
//添加任务
inline int getBusyNumber();
//获取忙线程数
inline int getAliveNumber();
//获取活着的线程数


};
#endif //C___PRACTICE_THREADPOOL_H

类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
//
// Created by keqiu on 2023/7/18.
//

#include "ThreadPool.h"
#include<iostream>
#include<cstring>
#include<unistd.h>

using std::cout;
using std::endl;

#define CHECK_TIME 5//默认管理者线程检测时间
#define ADD_NUM 2//一次添加3个线程
#define EXIT_NUM 2//一次销毁3个线程

template<typename T>
inline void safe_delete_void_ptr(void* &target);

ThreadPool::ThreadPool(int min, int max)
{
do
{

taskQueue = new (std::nothrow)TaskQueue;
/*没有std::nothrow则if条件始终为false*/
if(taskQueue == nullptr)
{
cout << "new taskQueue error" << endl;
break;
}


threadIDs = new (std::nothrow)pthread_t[max];//分配动态数组
/*没有std::nothrow则if条件始终为false*/
if(threadIDs == nullptr)
{
cout << "new threadIDs error" << endl;
break;
}

memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;
exitNum = 0;

if(pthread_mutex_init(&threadLock, nullptr) != 0 ||
pthread_mutex_init(&busyLock, nullptr) != 0 ||
pthread_cond_init(&notEmpty, nullptr) != 0)
{
cout << "init the lock or cond error" << endl;
break;
}

shutdown = false;

/*
* 类的成员函数传递地址时失败报错:Reference to non-static member function must be called
* 原因:类的成员函数不是静态时,函数没有地址,只有实例化之后才会有地址,所以不能传递地址
* 两种解决方案:
* 1.将成员函数改为静态函数
* 2.将成员函数改为全局函数,独立于类之外(想要访问成员变量就必须加上友元函数)
*
* */
pthread_create(&managerID, nullptr, manager, this);
for(int i = 0; i < min; ++i)
{
pthread_create(&threadIDs[i], nullptr, worker, this);
}

return;//此处不返回将会导致之后的释放资源

}while(false);

if(threadIDs)
{
delete[] threadIDs;
threadIDs = nullptr;
}//如果threadIDs不为空,释放内存
if(taskQueue)
{
delete taskQueue;
taskQueue = nullptr;
}//如果taskQueue不为空,释放内存

}


void* ThreadPool::worker(void* arg)
{
auto pool = static_cast<ThreadPool*>(arg);//传递的是(void*)this,所以要强制类型转换
/*此处static_cast为C++的强制类型转换,也可用C中的(ThreadPool*)形式
*
*
* 静态函数不能访问成员变量,所以必须传入该类的实例化对象
*
* */
while(true)
{

pthread_mutex_lock(&pool->threadLock);
while(pool->taskQueue->getTaskSize()==0 && !pool->shutdown)
{
cout<<"thread "<<pthread_self()<<" is waiting"<<endl;
pthread_cond_wait(&pool->notEmpty, &pool->threadLock);
//如果任务队列为空,且线程池没有关闭,则等待
//等待条件变量,会先解锁,等待被唤醒,再次拿到了这把锁

/*管理者销毁线程逻辑
*
* 唤醒后判断是否要销毁线程
* */
if (pool->exitNum > 0)
{
pool->exitNum--;//不能放在if里面,否则可能在产生新线程时,Num>0,直接就被销毁了,不符合期望,故保证每次循环都减一
/*限定当存活的线程个数大于最小个数时才销毁*/
if (pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->threadLock);//解锁
pool->threadExit();
}
}
}

if(pool->shutdown)//如果线程池关闭,则线程全部退出
{
pthread_mutex_unlock(&pool->threadLock);//避免死锁
pool->threadExit();
}

//取出任务工作
Task task = pool->taskQueue->takeTask();

//此处生产者不需要被唤醒了,任务队列可以无限大,即可以无限量的任务
pthread_mutex_unlock(&pool->threadLock);//解锁

pthread_mutex_lock(&pool->busyLock);
pool->busyNum++;
pthread_mutex_unlock(&pool->busyLock);
cout << "thread " << pthread_self() << " start working" << endl;


task.working(task.arg);//执行任务
safe_delete_void_ptr<int>(task.arg);//直接delete task.arg 有危险
task.arg = nullptr;

cout << "thread " << pthread_self() << " end working" << endl;
pthread_mutex_lock(&pool->busyLock);
pool->busyNum--;
pthread_mutex_unlock(&pool->busyLock);
}
}


void* ThreadPool::manager(void *arg)
{
auto pool = static_cast<ThreadPool*>(arg);

while(!pool->shutdown)
{
sleep(CHECK_TIME);//每隔3秒检测一次

pthread_mutex_lock(&pool->threadLock);
size_t queueSize = pool->taskQueue->getTaskSize();//取出任务个数
int liveNum = pool->liveNum;//取出存活线程个数
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->threadLock);



//添加线程规则
//1.任务个数>存活线程个数&&存活线程个数<最大线程个数时,创建线程
if(queueSize>liveNum&&liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->threadLock);

int counter = 0;
/*for循环三个条件
* 1.遍历线程id
* 2.counter<ADD_NUM
* 3.在存活线程个数小于最大线程个数时才添加*/
for(int i=0;i<pool->maxNum&&counter<ADD_NUM&&pool->liveNum<pool->maxNum;i++)
{
//找到空闲的线程id,创建线程
if(pool->threadIDs[i]==0)
{
pthread_create(&pool->threadIDs[i],nullptr,worker,pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->threadLock);
}

//销毁线程规则
//1.忙线程*2<存活线程&&存活线程>最小线程个数时,销毁线程
if(busyNum*2<liveNum&&liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->threadLock);
pool->exitNum = EXIT_NUM;
pthread_mutex_unlock(&pool->threadLock);
//唤醒线程池中所有的线程,让他们自己执行,判断是否退出
for(int i=0;i<EXIT_NUM;i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}

return nullptr;
}

void ThreadPool::threadExit()
{
/*使线程退出时线程id归0*/
pthread_t tid = pthread_self();//获取当前线程的id
for(int i = 0; i < maxNum; ++i)
{
if(threadIDs[i] == tid)
{
threadIDs[i] = 0;
cout << "threadExit() called, threadID: " << tid << " exiting..." << endl;
break;
}
}
pthread_exit(NULL);//C的函数,此处不用nullptr,而是用NULL
}


void ThreadPool::addTask(Task task)
{

//此处不用判断任务队列是否满,因为任务队列可以无限大
if(shutdown)
{
return;
}//如果线程池关闭,则不添加任务

taskQueue->addTask(task);//添加任务,此时不用上锁原因,因为addTask中已经上锁了

pthread_cond_signal(&notEmpty);//唤醒线程池中的消费者线程
}


int ThreadPool::getBusyNumber()
{
//用busyNum自己的锁,提高效率
pthread_mutex_lock(&busyLock);
int busyNum = this->busyNum;
pthread_mutex_unlock(&busyLock);
return busyNum;
}


int ThreadPool::getAliveNumber()
{
//用线程池的锁
pthread_mutex_lock(&threadLock);
int liveNum = this->liveNum;
pthread_mutex_unlock(&threadLock);
return liveNum;
}


ThreadPool::~ThreadPool()
{
this->shutdown = true;

pthread_join(this->managerID, nullptr);//等待管理者线程结束

for(int i = 0; i < this->liveNum; ++i)
{
pthread_cond_signal(&notEmpty);//唤醒所有线程
}


pthread_mutex_destroy(&threadLock);
pthread_mutex_destroy(&busyLock);
pthread_cond_destroy(&notEmpty);

if(taskQueue)
{
delete taskQueue;
}

if(threadIDs)
{
delete [] threadIDs;
}

}

template <typename T>
void safe_delete_void_ptr(void* &target)
{
if(target)
{
T* temp = static_cast<T*>(target);
delete temp;
temp = nullptr;
target = nullptr;
}
}//

遇到的问题

注意25行用new分配内存C++中,new操作符默认不会返回NULL,而是会在分配内存失败时抛出一个std::bad_alloc异常。这样做的好处是,你不需要在每次使用new操作符后检查返回值是否为NULL,而是可以用try/catch语句统一处理异常。如果你想让new操作符在分配内存失败时返回NULL,而不是抛出异常,你可以使用std::nothrow参数

解决方案:

  • try/catch语句统一处理异常
  • std::nothrow参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//1.第一种
try {
taskQueue = new TaskQueue;
}
catch (std::bad_alloc &e) {
cout << "new taskQueue error" << endl;
break;
}

try {
threadIDs = new pthread_t[max];//分配动态数组
}
catch (std::bad_alloc &e) {
cout << "new threadIDs error" << endl;
break;
}
/////////////////////

//第二种,见上方代码中使用方法,25行

注意这里133行代码,C++中不能用delete直接删除一个void*指针,因为delete需要调用被删除对象的析构函数,而void指针没有类型信息,所以无法确定调用哪个析构函数。这样做会导致未定义行为,可能会出现崩溃或内存泄漏。你必须在delete之前把void指针转换回它原来的类型,才能正确地释放内存。

  • 解决方案一:使用模板定义内联函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename T>
inline void safe_delete_void_ptr(void* &target);

template <typename T>
void safe_delete_void_ptr(void* &target)
{
if(target)
{
T* temp = static_cast<T*>(target);
delete temp;
temp = nullptr;
target = nullptr;
}
}//

[ C++指针编程你要小心的陷阱——如何优雅的释放指针void*_c++释放void指针_二流小宝的博客-CSDN博客](https://blog.csdn.net/SweetTool/article/details/77688337?ops_request_misc={"request_id"%3A"168977949416800222881353"%2C"scm"%3A"20140713.130102334.."}&request_id=168977949416800222881353&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~baidu_landing_v2~default-1-77688337-null-null.142^v90^insert_down28v1,239^v2^insert_chatgpt&utm_term=c%2B%2B delete void 指针&spm=1018.2226.3001.4187)

  • 解决方案二:用模板类

3.修改为模板类处理指针释放问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//
// Created by keqiu on 2023/7/18.
//

#ifndef C___PRACTICE_TASKQUEUE_H
#define C___PRACTICE_TASKQUEUE_H

#include <queue>
#include <pthread.h>

using function = void(*)(void* arg);//为一个函数指针类型取别名

template <typename T>//含有模板类的函数或类都要加这个
struct Task{

Task<T>(){
working = nullptr;
arg = nullptr;
}
/*无参构造*/
Task<T>(function working, void* arg){
this->working = working;
this->arg = (T*)arg;
}
/*有参构造*/
function working;
T* arg;
};

template <typename T>
class TaskQueue{

private:
std::queue<Task<T>> taskQueue;//任务队列,用容器实现
pthread_mutex_t QueueLock;//互斥锁

public:
TaskQueue();

~TaskQueue();

void addTask(Task<T> task);
/*添加任务*/
void addTask(function working, void* arg);
/*addtask重载函数*/
Task<T> takeTask();
/*取出任务*/
inline size_t getTaskSize(){
return taskQueue.size();
}
/*获取任务队列大小*/

};
#endif //C___PRACTICE_TASKQUEUE_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//
// Created by keqiu on 2023/7/18.
//

#include "TaskQueue.h"
#include <iostream>
template <typename T>
TaskQueue<T>::TaskQueue()
{
pthread_mutex_init(&QueueLock, nullptr);
}
//构造函数
template <typename T>//模板类前面都要加这个
TaskQueue<T>::~TaskQueue()
{
pthread_mutex_destroy(&QueueLock);
}
//析构函数
template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{

pthread_mutex_lock(&QueueLock);//此处如果资源已经被释放,就会发生段错误
taskQueue.push(task);
pthread_mutex_unlock(&QueueLock);

}
template <typename T>
void TaskQueue<T>::addTask(function working, void *arg)
{
pthread_mutex_lock(&QueueLock);
taskQueue.push(Task<T>(working, arg));
pthread_mutex_unlock(&QueueLock);
}

template <typename T>
Task<T> TaskQueue<T>::takeTask()
{
pthread_mutex_lock(&QueueLock);
Task<T> t;
if(!taskQueue.empty())
{
t = taskQueue.front();//取出队列头部元素
taskQueue.pop();//删除队列头部元素
}
pthread_mutex_unlock(&QueueLock);

return t;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//
// Created by keqiu on 2023/7/18.
//

#ifndef C___PRACTICE_THREADPOOL_H
#define C___PRACTICE_THREADPOOL_H
#include "TaskQueue.h"
#include "TaskQueue.cpp"
/*特殊:
* 当有类模板时,类模板的声明和定义分开写时,需要包含头文件和源文件*/

template <typename T>
class ThreadPool {
private:
TaskQueue<T>* taskQueue;//任务队列
pthread_t managerID;//管理者线程id
pthread_t* threadIDs;
int minNum;//最小线程数
int maxNum;//最大线程数
int busyNum;//忙线程数
int liveNum;//活着的线程数
int exitNum;//要销毁的线程数
pthread_mutex_t threadLock;//锁住整个线程池
pthread_mutex_t busyLock;//单独锁住busyNum,提高效率
pthread_cond_t notEmpty;//任务队列为空时阻塞,不为空时唤醒

bool shutdown;//是否要销毁线程池

static void* worker(void* arg);
//工作线程函数
static void* manager(void* arg);
//管理者线程函数
void threadExit();

public:
ThreadPool(int min,int max);
//构造函数
~ThreadPool();
//析构函数
void addTask(Task<T> task);
//添加任务
inline int getBusyNumber();
//获取忙线程数
inline int getAliveNumber();
//获取活着的线程数


};
#endif //C___PRACTICE_THREADPOOL_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
//
// Created by keqiu on 2023/7/18.
//

#include "ThreadPool.h"
#include<iostream>
#include<cstring>
#include<unistd.h>

using std::cout;
using std::endl;

#define CHECK_TIME 5//默认管理者线程检测时间
#define ADD_NUM 2//一次添加3个线程
#define EXIT_NUM 2//一次销毁3个线程

template <typename T>
ThreadPool<T>::ThreadPool(int min, int max)
{

do
{
taskQueue = new TaskQueue<T>;
if(taskQueue == nullptr)
{
cout << "new taskQueue error" << endl;
break;
}

threadIDs = new pthread_t[max];//分配动态数组
if(threadIDs == nullptr)
{
cout << "new threadIDs error" << endl;
break;
}

memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;
exitNum = 0;

if(pthread_mutex_init(&threadLock, nullptr) != 0 ||
pthread_mutex_init(&busyLock, nullptr) != 0 ||
pthread_cond_init(&notEmpty, nullptr) != 0)
{
cout << "init the lock or cond error" << endl;
break;
}

shutdown = false;

/*
* 类的成员函数传递地址时失败报错:Reference to non-static member function must be called
* 原因:类的成员函数不是静态时,函数没有地址,只有实例化之后才会有地址,所以不能传递地址
* 两种解决方案:
* 1.将成员函数改为静态函数
* 2.将成员函数改为全局函数,独立于类之外(想要访问成员变量就必须加上友元函数)
*
* */
pthread_create(&managerID, nullptr, manager, this);
for(int i = 0; i < min; ++i)
{
pthread_create(&threadIDs[i], nullptr, worker, this);
}

return;//此处不返回将会导致之后的释放资源

}while(false);

if(threadIDs)
{
delete[] threadIDs;
threadIDs = nullptr;
}//如果threadIDs不为空,释放内存
if(taskQueue)
{
delete taskQueue;
taskQueue = nullptr;
}//如果taskQueue不为空,释放内存

}

template <typename T>
void* ThreadPool<T>::worker(void* arg)
{
auto pool = static_cast<ThreadPool*>(arg);//传递的是(void*)this,所以要强制类型转换
/*此处为C++的强制类型转换,也可用C中的(ThreadPool*)
*
*
* 静态函数不能访问成员变量,所以必须传入该类的实例化对象
*
* */
while(true)
{

pthread_mutex_lock(&pool->threadLock);
while(pool->taskQueue->getTaskSize()==0 && !pool->shutdown)
{
cout<<"thread "<<pthread_self()<<" is waiting"<<endl;
pthread_cond_wait(&pool->notEmpty, &pool->threadLock);
//如果任务队列为空,且线程池没有关闭,则等待
//等待条件变量,会先解锁,等待被唤醒,再次拿到了这把锁

/*管理者销毁线程逻辑
*
* 唤醒后判断是否要销毁线程
* */
if (pool->exitNum > 0)
{
pool->exitNum--;//不能放在if里面,否则可能在产生新线程时,Num>0,直接就被销毁了,不符合期望,故保证每次循环都减一
/*限定当存活的线程个数大于最小个数时才销毁*/
if (pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->threadLock);//解锁
pool->threadExit();
}
}
}

if(pool->shutdown)//如果线程池关闭,则线程全部退出
{
pthread_mutex_unlock(&pool->threadLock);//避免死锁
pool->threadExit();
}

//取出任务工作
Task task = pool->taskQueue->takeTask();

//此处生产者不需要被唤醒了,任务队列可以无限大,即可以无限量的任务
pthread_mutex_unlock(&pool->threadLock);//解锁

pthread_mutex_lock(&pool->busyLock);
pool->busyNum++;
pthread_mutex_unlock(&pool->busyLock);
cout << "thread " << pthread_self() << " start working" << endl;


task.working(task.arg);//执行任务
delete task.arg;//释放参数资源
task.arg = nullptr;

cout << "thread " << pthread_self() << " end working" << endl;
pthread_mutex_lock(&pool->busyLock);
pool->busyNum--;
pthread_mutex_unlock(&pool->busyLock);
}
}

template <typename T>
void* ThreadPool<T>::manager(void *arg)
{
auto pool = static_cast<ThreadPool*>(arg);

while(!pool->shutdown)
{
sleep(CHECK_TIME);//每隔3秒检测一次

pthread_mutex_lock(&pool->threadLock);
size_t queueSize = pool->taskQueue->getTaskSize();//取出任务个数
int liveNum = pool->liveNum;//取出存活线程个数
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->threadLock);



//添加线程规则
//1.任务个数>存活线程个数&&存活线程个数<最大线程个数时,创建线程
if(queueSize>liveNum&&liveNum<pool->maxNum)
{
pthread_mutex_lock(&pool->threadLock);

int counter = 0;
/*for循环三个条件
* 1.遍历线程id
* 2.counter<ADD_NUM
* 3.在存活线程个数小于最大线程个数时才添加*/
for(int i=0;i<pool->maxNum&&counter<ADD_NUM&&pool->liveNum<pool->maxNum;i++)
{
//找到空闲的线程id,创建线程
if(pool->threadIDs[i]==0)
{
pthread_create(&pool->threadIDs[i],nullptr,worker,pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->threadLock);
}

//销毁线程规则
//1.忙线程*2<存活线程&&存活线程>最小线程个数时,销毁线程
if(busyNum*2<liveNum&&liveNum>pool->minNum)
{
pthread_mutex_lock(&pool->threadLock);
pool->exitNum = EXIT_NUM;
pthread_mutex_unlock(&pool->threadLock);
//唤醒线程池中所有的线程,让他们自己执行,判断是否退出
for(int i=0;i<EXIT_NUM;i++)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}

return nullptr;
}

template <typename T>
void ThreadPool<T>::threadExit()
{
/*使线程退出时线程id归0*/
pthread_t tid = pthread_self();//获取当前线程的id
for(int i = 0; i < maxNum; ++i)
{
if(threadIDs[i] == tid)
{
threadIDs[i] = 0;
cout << "threadExit() called, threadID: " << tid << " exiting..." << endl;
break;
}
}
pthread_exit(NULL);//C的函数,此处不用nullptr,而是用NULL
}

template <typename T>
void ThreadPool<T>::addTask(Task<T> task)
{

//此处不用判断任务队列是否满,因为任务队列可以无限大
if(shutdown)
{
return;
}//如果线程池关闭,则不添加任务

taskQueue->addTask(task);//添加任务,此时不用上锁原因,因为addTask中已经上锁了

pthread_cond_signal(&notEmpty);//唤醒线程池中的消费者线程
}

template <typename T>
int ThreadPool<T>::getBusyNumber()
{
//用busyNum自己的锁,提高效率
pthread_mutex_lock(&busyLock);
int busyNum = this->busyNum;
pthread_mutex_unlock(&busyLock);
return busyNum;
}

template <typename T>
int ThreadPool<T>::getAliveNumber()
{
//用线程池的锁
pthread_mutex_lock(&threadLock);
int liveNum = this->liveNum;
pthread_mutex_unlock(&threadLock);
return liveNum;
}

template <typename T>
ThreadPool<T>::~ThreadPool()
{
this->shutdown = true;//

pthread_join(this->managerID, nullptr);//等待管理者线程结束

for(int i = 0; i < this->liveNum; ++i)
{
pthread_cond_signal(&notEmpty);//唤醒所有线程
}


pthread_mutex_destroy(&threadLock);
pthread_mutex_destroy(&busyLock);
pthread_cond_destroy(&notEmpty);

if(taskQueue)
{
delete taskQueue;
}

if(threadIDs)
{
delete [] threadIDs;
}


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include"ThreadPool.h"
#include"ThreadPool.cpp"
#include<unistd.h>

void working(void* arg)
{
int num = *(int*)arg;
std::cout << "thread " << pthread_self() << " is working, number = " << num << std::endl;
sleep(1);
}

int main(){

ThreadPool<int> pool(3, 10);
for(int i=0;i<100;i++)
{
int*num = new int;//分配空间
*num = i+100;
pool.addTask(Task<int>(working, num));
}

sleep(30);

return 0;
}

特殊:当有类模板时,类模板的声明和定义分开写时,需要包含头文件和源文件,否则会报找不到定义的错误