小编个人主页详情<—请点击
小编个人gitee代码仓库<—请点击
linux系列专栏<—请点击
倘若命中无此运,孤身亦可登昆仑,送给屏幕面前的读者朋友们和小编自己!
前言
【linux】多线程(七)基于环形队列的生产者消费者模型,POSIX信号量: sem_init,sem_destroy,sem_wait,sem_post——书接上文 详情请点击<——,本文会在上文的基础上进行讲解,所以对上文不了解的读者友友请点击前方的蓝字链接进行学习
本文由小编为大家介绍——【linux】多线程(八)线程池
一、概念讲解
- 线程池使用了池化技术,以空间换时间。线程池是线程的一种使用模式。线程过多会带来调度开销,影响整体性能。而线程池维护者多个线程,等待着被派发可并发执行的任务。
- 这避免了在处理短时间任务时创建与销毁线程的代价,还能保证内核被充分利用,防止过度调度
- 应用场景:
(1)需要大量线程来完成任务,且完成任务的时间较短。例如:web服务器完成网页请求
(2)对性能要求苛刻的应用。例如:要求服务器快速响应用户请求
(3)接收突发性的大量请求,但不至于使服务器因此产生大量线程的应用 - 线程池运作过程:
(1)线程池创建固定数量的线程,循环从任务队列中获取任务对象
(2)获取到任务对象之后,执行任务对象的任务接口
- 如上就是线程池的原理了,那么线程池首先会在自己内部创建出一批线程,那么线程被创建出来之后会进行检测任务队列,如果任务队列没有任务,那么线程此时就没有任务要执行了,所以线程就去条件变量下去等待了,那么多个线程在最开始任务队列没有任务,所以就会依次全部都去条件变量下进行等待
- 当任务队列来任务的时候,此时线程池的线程都已经在条件变量下进行等待了,所以此时当任务入队列之后,就要唤醒在条件变量下进行等待的一个线程,让这个线程执行任务,当这个线程执行完成之后又会去检测任务队列有没有任务,如果没有它就继续会去条件变量下进行等待,以此循环往复下运行
- 并且任务队列只有一份,所以任务队列是一个共享资源,既然是共享资源,那么被多线程同时访问的时候就会有问题,那么就要干什么?对的通过互斥锁保护共享资源,所以当任务队列入任务的时候,应该申请锁,当任务队列入完任务的时候,应该释放锁。当条件变量检测任务队列执行任务队列的任务的时候应该申请锁,当执行完毕应该释放锁
- 并且当最开始任务队列没有任务的时候,线程池的线程只能在条件变量下进行等待,等待线程将任务入任务队列。那么这就表现出一定的顺序性了,即同步
- 所以这不就符合生产者消费者模型的321原则了
3种关系:(1)生产者与生产者:互斥(2)消费者与消费者:互斥(3)生产者与消费者:互斥,同步。
2种角色:(1)生产者(2)消费者。
1个场所:(1)特定结构的内存空间 - 其中的1个场所是任务队列,2种角色,线程池内的线程是消费者,向任务队列入任务的线程是生产者,3种关系,由于只有1个生产者,不存在多个生产者,所以(1)自然而然不需要满足,(2)多个消费者之间的关系通过互斥锁保证了互斥(3)生产者与消费者的互斥通过互斥锁保证,同步通过条件变量来保证,所以线程池的实现就是基于生产者消费者模型实现的
二、代码实现
Task.hpp
- 小编在之前的文章中已经讲解了如何模拟实现计算任务,所以这里小编就将任务直接拿来用,关于讲解后面的蓝字链接对应的文章中的第三点实现环形队列的单生产者单消费者模型中的第一点Task.hpp即计算任务的模拟实现的讲解,请点击阅读<——
ThreadPool.hpp
基本框架
- 由于作为线程池的编写者,我们并不知道线程池的任务队列中要放什么类型的任务,所以我们将线程池定义为模板类,并且定义一个静态全局变量defaultnum等于5作为线程池创建线程数量的默认值,为什么这里要设置为静态的全局变量,而不是全局变量,因为静态的全局变量的作用域仅仅在当前文件,其它文件无法使用该变量,而全局变量的作用域是当前项目的全部文件,除了当前文件可以使用这个变量,项目中的其它文件也可以使用这个变量,所以我们为了一定的封装性考虑,将defaultnum设置为静态全局变量
- 那么线程池线程池,首先就要有多个线程,既然是多个线程,那么线程就要有属性,这里的属性作为成员变量我们使用struct类ThreadInfo存储两个,第一个是线程的tid,第二个是线程的名字threadname
- 那么ThreadPool线程池的私有成员变量应该有一个为模板的类型为queue<T>的任务队列tasks_,还应该有一个vector<ThreadInfo>类型的容器threads_用于管理多个线程,还要有一个锁mutex_,一个条件变量cond_
- 那么对于一些函数我们期望在类内使用不对外使用,所以设置成私有成员函数,例如:Lock函数封装pthread_mutex_lock申请锁,Unlock封装pthread_mutex_unlock释放锁,ThreadSleep函数封装pthread_cond_wait在条件变量下等待,WakeUp函数封装pthread_cond_signal唤醒条件变量队头的线程,IsQueueEmpty函数封装任务队列的判空tasks_.empty(),GetThreadName函数用于根据形参线程的tid,for循环遍历管理线程的容器threads,然后找出与tid相等的ti,然后返回ti对应的线程名字
- 那么我们就在线程池的构造函数中,所以参数设置为线程池要创建线程的数目num,缺省参数为defaultnum即可,然后在初始化列表中对threads_进行扩容为num大小,这样可以减少频繁扩容的次数提高效率,并且这样就可以直接对空间的数据进行访问修改了,对于num,如果用户不传入,那么使用缺省值即可,如果用户显式传入,那么采用用户传入的即可,那么接下来对锁和条件变量进行初始化即可
- 在线程池的析构函数中,我们对锁和条件变量进行销毁即可
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <pthread.h>
static int defaultnum = 5;
struct ThreadInfo
{
pthread_t tid;
std::string threadname;
};
template<class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void WakeUp()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for(auto& ti : threads_)
{
if(ti.tid == tid)
return ti.threadname;
}
return "None";
}
public:
ThreadPool(int num = defaultnum):threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::queue<T> tasks_;
std::vector<ThreadInfo> threads_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
Start
- 那么线程池给用户提供一个Start接口,当用户将线程池创建出来之后,如果不调用这个Start接口,那么线程池仅仅完成初始化的工作,并不创建线程,只有当用户调用这个start接口的时候线程池才创建线程
- 那么在这个Start函数中我们先求出,要创建线程的数量n,然后使用for循环循环n次即可,在每一次的循环中,我们设置一下对应的线程名即可,格式为"thread-" + (标号i + 1),采用i + 1的主要原因是相比于从0开始,从1开始更清晰一点
- 那么接下来使用pthread_create创建线程即可,值得注意的是HandlerTask线程函数我们是创建线程池这个class类内的,我们之前创建线程函数的时候都是直接定义在全局,那么定义在全局的时候线程函数的类型就是void*(*)(void*),即参数为void*,返回值为void*
- 而定义在class类内的线程函数的类型在明面是我们认为是void*(*)(void*),即参数为void*,返回值为void*,但是实际上不是,因为函数的第一个形参还有一个隐藏的this指针,而线程函数pthread_create要求线程函数的类型必须是void*(*)(void*),即参数为void*,返回值为void*的,所以此时如果贸然传入HandlerTask这个类的成员函数的线程函数就会出现类型不匹配,所以我们不能这样定义线程函数HandlerTask,同样的我们也不能将线程函数HandlerTask定义成全局的,因为这样会破环类的封装性,那么我们应该如何做呢?
- 很简单将线程函数HandlerTask定义成静态成员变量即可,这样就没有this指针了,所以类型也就符合pthread_create对于线程函数的要求了,但是这样线程函数HandlerTask是静态的成员函数,那么就无法访问类内的成员函数了,只能访问类的static成员函数或者static成员变量,而任务队列是queue<T>类型的,那么线程函数HandlerTask也就无法访问任务队列或者其它普通的类的成员接口了,所以怎么办呢?
- 很简单,将创建的线程的线程的this传入给线程函数HandlerTask不就好了,即将this指针作为pthread_create的第四个参数,那么这样就可以传给线程函数HandlerTask了,所以在线程函数HandlerTask内进行类型转换即可拿到this指针
void Start()
{
int n = threads_.size();
for(int i = 0; i < n; i++)
{
threads_[i].threadname = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, (void*)this);
}
}
线程函数HandlerTask
- 线程函数我们并不期望在类外可以进行调用,所以我们将线程函数设置为私有的static成员函数,那么由于参数是this已经传入给了args,this指针指向的是ThreadPool线程池对象,所以this指针的类型是ThreadPool*,所以我们将args进行类型转换即可拿到类型为ThreadPool*的this指针tp
- 所以我们就可以使用tp访问类内的成员函数以及成员变量了,所以我们就调用接口pthread_self获取当前线程的tid然后传参给tp调用的GetThreadName,其返回值就是线程名字去初始化线程名name即可
- 所以接下来要判断访问共享资源任务队列中是否有任务了,所以我们要先申请锁,然后再使用while循环判断任务队列是否为空,如果为空那么则让线程去在条件变量下等待即可,这里采用while循环可以有效避免线程伪唤醒的情况
- 那么走到下一步一定是当前任务队列不为空,或者线程由于任务队列加入了任务被唤醒的情况,所以线程就可以出Pop出任务了,Pop的返回值就是任务,所以我们接收任务即可,然后直接释放锁,没错,释放锁即可,因为此时线程已经拿到了任务了,已经访问完成了任务队列拿到任务了,当前线程已经在独立栈上有了应该执行什么样的任务如何执行任务了,目前已经不会对共享资源访问了,所以释放锁即可,然后执行任务,打印对应的线程名称name以及处理结果即可,上述过程应该是建立在一个while死循环中,因为我们期望线程可以一直运行等待处理任务
- 那么此时我们看Pop,这个Pop直接拿队头的任务即可,然后弹出队头即可,最后返回队头元素,Pop是在线程函数HandlerTask的中的申请锁以及释放锁之间进行调用的,所以当前Pop访问一定是持有锁的情况,所以Pop函数内部不需要再次申请锁了,由于当前线程执行到Pop的时候一定是持有锁的,所以Pop可以保证访问共享资源的时候只有当前线程这一个执行流进行访问,不会出现并发问题,并且Pop函数我们也不希望对外提供,所以我们将Pop设置成私有成员变量即可
private:
T Pop()
{
T out = tasks_.front();
tasks_.pop();
return out;
}
static void* HandlerTask(void* args)
{
ThreadPool* tp = static_cast<ThreadPool*>(args);
std::string name = tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
std::cout << name << " run" << ", result: " << t.GetResult() << std::endl;
}
return nullptr;
}
Push
- Push是线程池对外提供的用于用户线程向线程池内入任务的接口,那么也一定是需要互斥访问,所以在任务入队列之前要申请锁,然后任务入队列,此时任务队列中已经入了一个任务,所以我生产者线程(用户线程)可以保证被唤醒的一个处于在条件变量下等待唤醒队列的一个线程一定有任务可以被执行,所以我WakeUp唤醒条件变量的队头正在等待的一个线程即可,最后由于已经任务入队列,并且唤醒了线程,所以任务入队列工作完成,最后释放锁即可
void Push(const T& in)
{
Lock();
tasks_.push(in);
WakeUp();
Unlock();
}
main.cpp
- 主线程先new一个线程池tp,然后获取运算符的数目len,设置随机数种子
- 此时虽然new了一个线程池,但是线程池内的线程还没有跑起来,所以主线程执行Start让线程池内的线程跑起来,由于此时任务队列还没有数据,所以这多个线程就会去条件变量下按照一定的顺序等待任务队列入任务
- 此时作为主线程的生产者线程,就要先获取数据,在实际的场景中这个数据应该是从用户,网络等进行获取,那么这里我们采用随机数的方式获取数据,左操作数和的值我们控制在0到9之间,所以应该采用随机数取模10,即取模获取左操作数,usleep休眠
- 这个休眠是必须的,因为随机数是基于时间生成的,如果代码跑的太快,那么左操作数的值概率可能和右操作数相同,由于休眠了,大概率右操作数的值大概率可能和左操作数不同,右操作数的值我们期望是0到4,所以随机数取模5,仅仅是0到4,右操作数据范围较小,这样有较大的可能得到0,那么这样就可以较大概率看到异常结果,让右操作数为0,发生除零错误,发生取模零错误,我们期望看到异常对应的错误码
- 计算运算符字符串opers的长度len,由于opers = "±*/%"所以我们计算出这个字符串的长度为5,然后让随机数取模5,这样就可以在0到4之间随机,然后我们采用下标+[ ]的方式拿操作符op即可
- 接下来采用上面的构建好的数据传入Task类构造出对象t,将任务Push交给线程池的任务队列,我们再打印一下交给了什么任务回显给用户即可
- 最后sleep一秒,让生产者生产数据不要太快,我们期望生产者不断的制作数据,并且生产数据到线程池的任务队列,所以整个过程应该是一个while死循环
- 最后为了防止内存泄漏,我们delete释放在堆上new出来的线程池tp
- 所以此时生产者生产慢,消费者消费满,所以我们可以观察到间隔1秒生产数据,然后消费者立刻消费数据
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
srand(time(nullptr));
int len = opers.size();
ThreadPool<Task>* tp = new ThreadPool<Task>();
tp->Start();
while(true)
{
int data1 = rand() % 10;
usleep(10);
int data2 = rand() % 5;
char op = opers[rand() % len];
Task t(data1, data2, op);
tp->Push(t);
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
delete tp;
return 0;
}
运行结果如下,无误
三、未使用单例优化的源代码
ThreadPool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <pthread.h>
static int defaultnum = 5;
struct ThreadInfo
{
pthread_t tid;
std::string threadname;
};
template<class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void WakeUp()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for(auto& ti : threads_)
{
if(ti.tid == tid)
return ti.threadname;
}
return "None";
}
T Pop()
{
T out = tasks_.front();
tasks_.pop();
return out;
}
static void* HandlerTask(void* args)
{
ThreadPool* tp = static_cast<ThreadPool*>(args);
std::string name = tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
std::cout << name << " run" << ", result: " << t.GetResult() << std::endl;
}
return nullptr;
}
public:
ThreadPool(int num = defaultnum):threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
void Start()
{
int n = threads_.size();
for(int i = 0; i < n; i++)
{
threads_[i].threadname = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, (void*)this);
}
}
void Push(const T& in)
{
Lock();
tasks_.push(in);
WakeUp();
Unlock();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::queue<T> tasks_;
std::vector<ThreadInfo> threads_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
main.cpp
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
srand(time(nullptr));
int len = opers.size();
ThreadPool<Task>* tp = new ThreadPool<Task>();
tp->Start();
while(true)
{
int data1 = rand() % 10;
usleep(10);
int data2 = rand() % 5;
char op = opers[rand() % len];
Task t(data1, data2, op);
tp->Push(t);
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
delete tp;
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <string>
std::string opers = "+-*/%";
enum Err
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int data1, int data2, char op)
: data1_(data1), data2_(data2), oper_(op), result_(0), exitcode_(0)
{}
void operator()()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
}
break;
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
std::string GetTask()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ?";
return r;
}
std::string GetResult()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ";
r += std::to_string(result_);
r += ", [";
r += "exitcode: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
makefile
ThreadPool:main.cpp
g++ -o $@ $^ -lpthread -std=c++11
.PHONY:clean
clean:
rm -f ThreadPool
四、单例模式
- 单例模式是一种常用的,经典的设计模式之一,是IT界的很多大佬针对一些常见的,经典的场景设计出来的解决方案,即设计模式
- 单例模式的特点就是一个类只能实例化出一个对象。例如在服务器开发的场景中,需要一个服务器将很多数据加载到内存,但是只需要一个服务器,所以就可以使用单例的类来构建服务器管理数据
- 单例模式有两种实现方式分别为懒汉模式和饿汉模式,关于懒汉模式以及饿汉模式小编在c++部分已经讲解过了所以这里小编就不再赘述了,详情请点击<——
- 懒汉模式最核心的就是延时加载,从而可以优化服务器的启动速度
- 而上面小编已经实现了线程池小程序了,下面小编则使用单例模式的实现方案的懒汉模式去对线程池小程序进行优化
五、单例模式进行优化
基本框架
- 懒汉模式要求在类中要有一个静态的成员变量,其实这个静态的成员变量的生命周期随进程,当进程被加载进来的时候,全局变量以及静态的成员变量就已经被创建了,只不过和全局变量相比,静态的成员变量的生命周期和全局变量相同都是随进程,但是静态的成员变量的作用域不同,静态的成员变量属于类,即静态的成员变量的作用域仅限于类域,全局变量的作用域则是全局
- 由于懒汉模式也是单例模式,所以就要求线程池类也只能创建一个对象,换句话来说我们就要将构造函数,析构函数私有化,并且使用防拷贝以及防赋值的手段delete关键字禁止拷贝和赋值
- 所以线程池的私有成员变量中需要有一个指向线程池的静态指针tp_,并且我们初始的时候应该把这个tp_在类外进行初始化,由于是模板,所以在类外进行初始化的时候要带上模板声明,下面的代码,为了便于观察,小编仅仅将懒汉模式的实现核心进行展现,将之前的线程池类的一些成员和变量暂时性忽略
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <pthread.h>
static int defaultnum = 5;
template<class T>
class ThreadPool
{
private:
ThreadPool(int num = defaultnum):threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T>& ) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& ) = delete;
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
static ThreadPool<T>* tp_;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;
GetInstance
- 所以我们要对外提供一个获取单例对象的接口GetInstance,那么既然是对用户提供,但是这个GetInstance这个类又是成员函数,如果要在类外调用成员函数那么就必须要有对象,而GetInstance这个函数又是用于获取单例对象的,这不就自相矛盾了吗?如何解决呢?
- 那么我们将成员函数GetInstance使用static修饰,让GetInstance这个成员函数成为静态的成员函数,这样的话,这个GetInstance就属于类了,不再需要对象也能进行调用,使用类的类型进行调用即可
- 那么接下来就要判断指向唯一的类对象的指针tp_是否为空,如果为空,那么说明此时类对象还没有创建,那么我们打印一个语句之后,new一个对象创建,然后让tp_指针指向这个对象即可,最后返回tp_指针即可,如果tp_指针不为空,那么说明此时对象已经创建了,所以我们不进入if语句,最后返回tp_指针即可,如下
public:
static ThreadPool<T>* GetInstance()
{
if(tp_ == nullptr)
{
std::cout << "log: singleton create done first" << std::endl;
tp_ = new ThreadPool<T>();
}
return tp_;
}
- 可是上述小编编写的GetInstance这个静态成员函数线程安全吗?即如果多线程并发执行GetInstance这个静态成员函数的时候会不会有并发问题,而去创建了多个tp_对象,即创建了多个线程池对象呢?不是线程安全的,并且会有并发问题,如何理解呢?
- 观察上面的小编编写的GetInstance这个静态成员函数,在多线程场景中,例如有三个线程,即线程A,线程B,线程C同时执行GetInstance了,那么最开始的时候,线程A很幸运第一次进入这个GetInstance函数,那么判断tp_指针为空,那么if判断成立,进入if语句内部,打印完成语句,刚刚new完对象,还没来得及将对象的地址写到tp_指针对应的内存空间,此时由于线程A的时间片到了,所以线程A就保存好自己的上下文,然后线程A被切换走了,注意此时属于类的静态变量的指针tp_在内存中仍然为nullptr
- 所以此时调度器就将线程B切换上来了,线程B也进行判断if语句,tp_指针为空,所以线程B也进入了if语句内部,打印完成语句,刚刚new完对象,还没来得及将对象的地址写到tp_指针对应的内存空间,此时由于线程B的时间片到了,所以线程B就保存好自己的上下文,然后线程B被切换走了,注意此时属于类的静态变量的指针tp_在内存中仍然为nullptr,之后调度器就将线程C切换上来了
- 线程C和线程A和线程B一样,判断完if语句成立之后,也是刚刚new完对象,还没来得及将对象的地址写到tp_指针对应的内存空间,就由于时间片到了,然后突然就被切换走了,然后调度器就将线程A切换上来了
- 所以线程A就将new完的对象的地址写到tp_指针对应的内存空间,然后返回,此时tp_指针不为空,注意此时线程池对象被创建了一次,然后线程A的时间片到了,调度器就将线程B切换上来了
- 所以线程B就从上次被切换走的位置继续开始执行, 所以线程B就将new完的对象的地址写到tp_指针对应的内存空间,注意线程B和线程A进行new线程池对象返回的地址是不同的,所以线程B这里就将线程池对象创建了第二次,并且还将原有的线程A已经创建好的tp_指针的地址给覆盖掉了,造成了内存泄漏,然后线程B的时间片到了,调度器就将线程C切换上来了
- 线程C和线程B一样,所以线程C就将new完的对象的地址写到tp_指针对应的内存空间,所以线程C这里就将线程池对象创建了第三次,并且还将原有的线程B已经创建好的tp_指针的地址给覆盖掉了,又造成了内存泄漏,外加线程池对象被创建了三次,所以此时就有了线程安全问题,那么静态的成员变量的指针tp_就是一个被多个线程共享的共享资源,所以我们应该如何保证共享资源的安全呢?加锁
- 所以此时我们就应该对访问共享资源tp_的地方进行加锁,注意这里if语句判断tp_指针的值仍旧是访问了临界资源,所以我们应该将整个if语句进行申请锁释放锁,界定好临界区
- 那么关于这个锁,这个锁应该是使用static修饰的静态的一个锁,为什么?因为GetInstance这个函数是静态成员函数,静态成员函数只能访问静态成员变量,对于普通成员变量无法访问,所以如果这个锁设置成普通的成员变量那么对于GetInstance这个静态成员函数来说则无法进行访问,所以我们应该将这个锁设置为静态的锁,即static pthread_mutex_t lock_;
- 然后就需要对这个锁进行初始化了,可是这个锁是静态的锁,静态成员变量需要在类外进初始化,那么我们在类外又无法调用pthread_mutex_init这个函数对锁进行初始化,这可如何是好?
- 其实早在当初讲解锁的初始化的时候,小编讲解定义全局的锁的初始化的时候,就已经讲解过了,全局的锁可以使用PTHREAD_MUTEX_INITIALIZER进行初始化,而静态的属于类的锁其实全局的锁的生命周期一样,即本质一样都是随进程,只是作用域不同,所以静态的属于类的锁也可以使用PTHREAD_MUTEX_INITIALIZER进行初始化,并且使用PTHREAD_MUTEX_INITIALIZER进行初始化了之后,不需要进行销毁,冥冥之中好像已经有人为我们提前设计好了应对不同的场景所需要的方法和手段,所以我们就可以使用PTHREAD_MUTEX_INITIALIZER对我们的静态的属于类的局部的锁进行初始化了,并且不需要使用pthread_mutex_destroy进行销毁
- 所以此时我们的静态的锁已经初始化好了,然后就可以使用锁对共享资源访问tp_指针的整个if语句进行加锁保护了
template<class T>
class ThreadPool
{
public:
static ThreadPool<T>* GetInstance()
{
pthread_mutex_lock(&lock_);
if(tp_ == nullptr)
{
std::cout << "log: singleton create done first" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
return tp_;
}
private:
static ThreadPool<T>* tp_;
static pthread_mutex_t lock_;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
- 但是编写到这里我们已经达到了圆满境界了,其实这样还是不够圆满,还没有到达大圆满境界,因为还有问题,还存在大量的无效率申请锁释放锁的情况
- 观察上面的代码,其实锁对临界资源tp_指针的保护仅限于第一次new线程池对象的时候,只要成功的第一次new了线程池对象,并且将tp_指针在内存的值修改为线程池对象的地址,即tp_指针已经指向第一次new线程池对象,所以后面申请到锁的线程即使进行if语句判断,那么tp_指针也不为空,那么也就不会进入if语句内部,然后再释放锁,最后返回tp_指针
- 所以如果存在多线程访问的时候,除了第一次创建线程池对象,并且修改tp_指针的线程,后续的线程执行都会出现频繁申请锁,不满足if语句条件,释放锁,然后返回,这个过程中存在了大量的频繁申请锁,释放锁,而中途又不满足if语句判断的情况,最后却直接返回tp_指针的情况,而频繁的申请锁,释放锁,底层也是封装的轻量化进程的系统调用,那么既然是系统调用就会有消耗,例如从用户态切换成内核态等,并且如果此时锁资源已经被其它线程申请走了,还会存在将当前线程挂起阻塞的情况,即涉及到对线程状态的修改,并且所有的线程对于申请锁,执行临界区的代码,释放锁都是串行执行的存在效率问题
- 所以我们能否有一种方案可以让后续的线程不要再频繁的申请锁释放锁了,因为尽管申请到了锁,也会由于tp_指针已经不为空了导致不会进入if语句,然后直接释放锁了,有方案的,再嵌套一层if语句即可
static ThreadPool<T>* GetInstance()
{
if(tp_ == nullptr)
{
pthread_mutex_lock(&lock_);
if(tp_ == nullptr)
{
std::cout << "log: singleton create done first" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
- 观察上面我们在申请锁释放锁的外围再嵌套一层if语句判断即可,此时如果存在多线程并发访问,最初的时候tp_指针为空,那么可能会有几个线程进入第一个if语句的内部然后申请锁,此时只能有一个线程申请锁成功,假设第一个线程申请锁成功了,那么其它处于if语句内部的线程就会由于锁资源就绪,申请锁失败导致阻塞等待锁资源
- 那么tp_指针为空,所以这个第一个线程就判断第二个if语句成立,然后创建了线程池对象,并且让tp_指针指向这个线程池对象,即修改tp_指针在内存中的值,此时tp_指针就不为空了,然后第一个线程释放锁,然后返回tp_指针
- 此时锁资源已经被释放了,那么后续的少量线程就会被唤醒,然后由于tp_指针就不为空了,就会判断第二个if语句不成立,然后释放锁了,返回tp_指针
- 然后后续再有多个线程去执行GetInstance的线程的时候,就会直接由于tp_指针不为空,所以第一个if语句不成立,然后就不会进入第一个if语句内部了,进而也就不会申请锁,释放锁了,而是直接由于第一个if语句不成立然后返回tp_指针
main.cpp
- 所以我们在主线程中使用 线程池类的类型 加 :: 调用GetInstance获取单例对象创建线程池对象(单例对象),然后运行线程池,构建任务,执行GetInstance获取单例对象然后将任务Push入线程池的任务队列
- 这里小编就仅仅演示一个主线程调用单例对象的情况,感兴趣的读者友友可以让主线程创建多个新线程,让多个新线程在线程函数中,创建单例对象,创建任务,执行GetInstance获取单例对象然后将任务Push入线程池的任务队列
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
srand(time(nullptr));
int len = opers.size();
ThreadPool<Task>::GetInstance()->Start();
while(true)
{
int data1 = rand() % 10;
usleep(10);
int data2 = rand() % 5;
char op = opers[rand() % len];
Task t(data1, data2, op);
ThreadPool<Task>::GetInstance()->Push(t);
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
return 0;
}
运行结果如下,无误
六、单例模式优化后的源代码
ThreadPool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <pthread.h>
static int defaultnum = 5;
struct ThreadInfo
{
pthread_t tid;
std::string threadname;
};
template<class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void WakeUp()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for(auto& ti : threads_)
{
if(ti.tid == tid)
return ti.threadname;
}
return "None";
}
T Pop()
{
T out = tasks_.front();
tasks_.pop();
return out;
}
static void* HandlerTask(void* args)
{
ThreadPool* tp = static_cast<ThreadPool*>(args);
std::string name = tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
std::cout << name << " run" << ", result: " << t.GetResult() << std::endl;
}
return nullptr;
}
public:
void Start()
{
int n = threads_.size();
for(int i = 0; i < n; i++)
{
threads_[i].threadname = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, (void*)this);
}
}
void Push(const T& in)
{
Lock();
tasks_.push(in);
WakeUp();
Unlock();
}
static ThreadPool<T>* GetInstance()
{
if(tp_ == nullptr)
{
pthread_mutex_lock(&lock_);
if(tp_ == nullptr)
{
std::cout << "log: singleton create done first" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defaultnum):threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T>& ) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& ) = delete;
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::queue<T> tasks_;
std::vector<ThreadInfo> threads_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T>* tp_;
static pthread_mutex_t lock_;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
main.cpp
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
srand(time(nullptr));
int len = opers.size();
// ThreadPool<Task>* tp = new ThreadPool<Task>();
// tp->Start();
ThreadPool<Task>::GetInstance()->Start();
while(true)
{
int data1 = rand() % 10;
usleep(10);
int data2 = rand() % 5;
char op = opers[rand() % len];
Task t(data1, data2, op);
ThreadPool<Task>::GetInstance()->Push(t);
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
// delete tp;
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <string>
std::string opers = "+-*/%";
enum Err
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int data1, int data2, char op)
: data1_(data1), data2_(data2), oper_(op), result_(0), exitcode_(0)
{}
void operator()()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
}
break;
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
std::string GetTask()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ?";
return r;
}
std::string GetResult()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ";
r += std::to_string(result_);
r += ", [";
r += "exitcode: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
makefile
ThreadPool:main.cpp
g++ -o $@ $^ -lpthread -std=c++11
.PHONY:clean
clean:
rm -f ThreadPool
总结
以上就是今天的博客内容啦,希望对读者朋友们有帮助
水滴石穿,坚持就是胜利,读者朋友们可以点个关注
点赞收藏加关注,找到小编不迷路!