C++11-ThreadPool

c++11实现线程池

包含了以下技术:std::mutexstd::condition_variablestd::functionstd::bind可变参模板std::packaged_taskstd::futurelambda表达式std::shared_ptrdecltypeauto等。
原理全在注释中。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#include <iostream>
#include <mutex>
#include <queue>
#include <functional>
#include <future>
#include <thread>
//#include <utility>
#include <vector>
#include <random>

//Thread safe implementation of a Queue using a std::queue
template<typename T>
class SafeQueue{
private:
std::queue<T> m_queue;
std::mutex m_mutex;
public:
constexpr SafeQueue() noexcept {}
SafeQueue(const SafeQueue<T>&) = delete;
SafeQueue(SafeQueue<T>&&) = delete;
SafeQueue<T>& operator=(const SafeQueue<T>&) = delete;
SafeQueue<T>& operator=(SafeQueue<T>&&) = delete;
~SafeQueue() {}
//向队列添加元素
void push(T&& element) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(std::move(element));
}
//取出元素
void pop(T& element) {
std::unique_lock<std::mutex> lock(m_mutex);
element = std::move(m_queue.front());
m_queue.pop();
}
//返回队列是否为空
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
//返回队列中元素长度
int size() {
std::unique_lock<std::mutex>(m_mutex);
return m_queue.size();
}
};

//工作线程类
class ThreadPool;
class ThreadWorker {
private:
//工作id
int m_id;
//所属线程池
ThreadPool* m_pool;
public:
constexpr ThreadWorker(ThreadPool* pool, const int id)noexcept
:m_pool(pool), m_id(id) {}
//重载()操作符
void operator()();
};
//线程池
class ThreadPool {
friend class ThreadWorker;
public:
//线程池的构造函数
ThreadPool(const int num = 4)
: m_threads(std::vector<std::thread>(num)), m_shutdown(false) {}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
~ThreadPool() {
//唤醒所有工作线程
m_shutdown = true;
//使他们正常退出
m_condition_variable.notify_all();
for (int i = 0, size = m_threads.size(); i < size; ++i) {
//在一个线程对象被析构之前,必须调用 join 或 detach,否则会在运行时引发错误。
if (m_threads[i].joinable()) {
m_threads[i].join();
}
}
}
//初始化线程池
void init() {
for (int i = 0, size = m_threads.size(); i < size; ++i) {
//分配工作线程
m_threads[i] = std::thread(ThreadWorker(this, i));
}

}

/*-----------------------------核心函数-------------------------------*/
//向线程池提交一个函数去异步执行。
template<typename F,typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
//用bind创建一个可调用对象
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//使用shared_ptr来封装packaged_task,延长packaged_task的生命周期,从而实现异步的调用。
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
//将task封装到void function用于加入任务队列中
std::function<void()> warpper_func = [task_ptr]() {
(*task_ptr)();
};
//添加到任务队列
m_queue.push(std::move(warpper_func));
//唤醒一个等待中的线程
m_condition_variable.notify_one();
//返回task_ptr的结果,该结果是一个future对象,可以异步的获取函数返回值。
return task_ptr->get_future();
}

private:
//线程池是否关闭
bool m_shutdown;
//任务队列
SafeQueue<std::function<void()>> m_queue;
//工作线程队列
std::vector<std::thread> m_threads;
//条件变量,可以让线程处于休眠或者唤醒状态
std::condition_variable m_condition_variable;
std::mutex m_condition_mutex;
};

void ThreadWorker::operator()() {
//定义基础函数func
std::function<void()> func;

while (!m_pool->m_shutdown) {
{
std::unique_lock<std::mutex> lock(m_pool->m_condition_mutex);
//如果任务队列为空,阻塞当前线程
while (m_pool->m_queue.empty()) {
m_pool->m_condition_variable.wait(lock);
if (m_pool->m_shutdown) return;
}
//取出任务队列中的元素
m_pool->m_queue.pop(func);
}
func();
}
}

//----------------------------------test-------------------------------------
static std::random_device rd;
static std::mt19937 mt(rd());
//对[-1000,1000]进行均匀采样
std::uniform_int_distribution<int> dist(-1000,1000);
auto rnd = std::bind(dist, mt);

//模拟处理时间
void simulate_computation() {
std::this_thread::sleep_for(std::chrono::milliseconds(1000+rnd()));
}
std::mutex print_mutex;
//两个数字相乘的简单函数,并打印结果
void multiply(const int a, const int b) {
simulate_computation();
const int res = a * b;
std::unique_lock<std::mutex> lock(print_mutex);
std::cout << a << "*" << b << "=" << res << std::endl;
}
//带传入传出参数,并且有返回值
int multiply_return(int& out,const int a, const int b) {
simulate_computation();
out = a * b;
std::cout << a << "*" << b << "=" << out << std::endl;
return a + b;
}

void example() {
//创建9个线程的线程池
ThreadPool pool(9);
//初始化线程池
pool.init();
//提交乘法操作,总共20个
for (int i = 1; i <= 2; ++i) {
for (int j = 1; j <= 10; ++j) {
pool.submit(multiply,i,j);
}
}
int output_ref;
auto future = pool.submit(multiply_return, std::ref(output_ref), 3, 5);
//future.get()会等待函数执行完成,后得到结果继续运行。
std::cout << future.get() << std::endl;
}
int main()
{
example();
return 0;
}


C++11-ThreadPool
https://howl144.github.io/2022/03/21/00012. C++11-ThreadPool/
Author
Deng Ye
Posted on
March 21, 2022
Licensed under