生产消费者模型的介绍以及其的模拟实现

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

    目录

    生产者消费者模型的概念

    生产者消费者模型的特点

    基于阻塞队列BlockingQueue的生产者消费者模型

    对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现

    ConProd.c文件的整体代码

    BlockQueue.h文件的整体代码

    对【基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】的测试

    对基于计算任务的生产者消费者模型的模拟实现

    Task.h的整体代码

    ConProd.c文件的整体代码

    对【基于计算任务的生产者消费者模型的模拟实现】的测试

    多生产者多消费者模型的模拟实现以及对多生产者和多消费者模型的感悟

    ConProd.c文件的整体代码

    对【多生产者多消费者模型的模拟实现】的测试

    【单生产者单消费者模型】和【多生产者多消费者模型】的应用场景或者说意义包括如何选择模型比较合适


    生产者消费者模型的概念

    结合下图思考生产者消费者模式就是通过一个容器即容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过容器来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给容器消费者不找生产者要数据而是直接从容器里取容器就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个容器就是用来给生产者和消费者解耦的。

    生产者消费者模型的特点

    生产者消费者模型是多线程同步与互斥的一个经典场景其特点如下

    如果对于线程控制、即线程的同步与互斥感到陌生请结合<<线程的互斥与同步>>一文进行阅读

    1、生产者消费者模型有三种关系 生产者和生产者互斥关系、消费者和消费者互斥关系、生产者和消费者互斥关系、同步关系。

    2、生产者消费者模型有两种角色 生产者和消费者。通常由进程或线程承担

    3、生产者消费者模型有一个交易场所 通常指的是内存中的一段缓冲区即一块内存空间可以自己通过某种方式组织起来。

    我们用代码编写生产者消费者模型的时候主要就是对以上三个特点进行维护。

    问题1那生产者和生产者、消费者和消费者、生产者和消费者它们之间为什么会存在互斥关系

    答案1介于生产者和消费者之间的容器可能会被多个执行流即进程或者线程同时访问因此我们需要将该临界资源即容器用互斥锁保护起来。其中所有的生产者和消费者都会竞争式的申请锁因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

    问题2那生产者和消费者之间为什么会存在同步关系

    答案2假如不存在同步关系即不对生产者和消费者的行为进行控制那么会有可能出现两种情况。情况1、生产者生产的速度比消费者消费的速度快那么当生产者生产的数据将容器塞满后生产者再生产数据即往容器中插入数据就会生产失败因为这里的容器容量是固定的没有扩容一说。情况2、消费者消费的速度比生产者生产的速度快那么当容器当中的数据被消费完后消费者再进行消费就会消费失败。所以为了让生产数据和消费数据都不会出现失败的情况我们应该让生产者和消费者访问该容器时具有一定的顺序性比如让生产者先生产然后再让消费者进行消费所以生产者和消费者之间才会存在同步关系。

    注意在以上理论中互斥关系保证的是数据的正确性避免数据因为时序不一致而紊乱而同步关系是为了让多线程之间协同起来让生产者线程能成功的完成生产数据、让消费者线程能成功的完成消费数据、避免出现消费和生产失败的情况。

    基于阻塞队列BlockingQueue的生产者消费者模型

    在多线程编程中阻塞队列即BlockingQueue、注意本质就是STL的queue是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于当队列为空时某个消费者线程A从队列获取元素的操作将会被阻塞直到队列中被另一个生产者线程B放入了元素消费者线程A才能从阻塞状态恢复成运行状态从而继续从队列中获取元素当队列满时某个生产者线程A往队列里存放元素的操作也会被阻塞直到在另一个消费者线程B中有元素被从队列中被取出生产者线程A才能从阻塞状态恢复成运行状态从而继续往队列中存放元素。

    对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现

    为了方便理解下面我们以单生产者线程、单消费者线程为例进行实现。

    这里说一下首先说明生产者消费者模型的编写思路然后说明阻塞队列BlockingQueue的编写思路。

    生产者消费者模型的编写思路如下

    在主函数中首先在堆上new创建一个阻塞队列BlockingQueue的对象然后创建两个线程一个充当生产者线程、另一个充当消费者线程。

    创建线程时需要一些参数所需参数1需要把阻塞队列BlockingQueue的对象传给两个线程作为两个线程的临界资源说一下阻塞队列BlockingQueue的对象能被两个线程作为临界资源是因为我们把阻塞队列对象的地址当作参数传给了两个线程。但实际上就算不传参因为阻塞队列是在堆上开辟的空间而堆上的数据是所有线程共享的所以阻塞队列对象还是能被两个线程看到能作为两个线程的临界资源。所需参数2一个是生产者线程函数将该函数传给生产者线程另一个是消费者线程函数将该函数传给消费者线程。函数不能凭空而来所以此时需要创建这两个函数才能将函数传给生产者和消费者线程。生产者线程函数的逻辑就是无限循环地把一个每次递增1的整数push进阻塞队列、消费者线程函数的逻辑就是无限循环地从阻塞队列中读出一个数据每读出一个数据都要把该数据从阻塞队列中删除。

    在创建线程完毕后立刻对生产者和消费者线程进行线程等待即调用pthread_join函数从而防止内存泄漏类似于防止僵尸进程造成的内存泄漏立刻编写是为了防止后序遗忘了这个步骤。

    ConProd.c文件的整体代码

    结合上面思路包含主函数的ConProd.c文件的整体代码如下。

    #include"BlockQueue.h"
    #include<pthread.h>
    #include<unistd.h>//提供sleep函数
    
    //消费者线程函数
    void* consumer(void*args)
    {
        BlockQueue<int> *bq = (BlockQueue<int>*)args;
        int a;//输出型参数
        
        //错误写法如下
        //while(bq->size()!=0)
        
        //正确写法如下为什么呢因为队列中没有数据时会被阻塞、等待生产者线程生产货物后继续消费而不是按照上面错误写法把交易场所中的货物消费完后就退出
        while(true)
        {
            bq->pop(&a);
            cout<<"消费了一个数据"<<a<<endl;        
        }
        return nullptr;
    }
    
    //生产者线程函数
    void* productor(void*args)
    {
        BlockQueue<int> *bq = (BlockQueue<int>*)args;
        int a=0;
        while(true)
        {
            cout<<"生产一个数据"<<a<<endl;
            bq->push(a);
            a++;
        }
        return nullptr;
    }
    
    
    int main()
    {
        BlockQueue<int> *bq = new BlockQueue<int>;
    
        pthread_t c,p;//为线程ID
        pthread_create(&c,nullptr, consumer, (void*)bq);
        pthread_create(&p,nullptr, productor, (void*)bq);
    
        pthread_join(c,nullptr);
        pthread_join(p,nullptr);
    
        return 0;
    }

    阻塞队列BlockingQueue类的编写思路如下

    首先确定BlockingQueue类内的成员变量如下。

    1、_q就不必解释了作为阻塞队列的底层容器其一定是需要存在的。说一下阻塞队列是生产者线程和消费者线程的交易场所所以阻塞队列是临界资源。

    2、_capacity在当前的消费者生产者模型中是不允许阻塞队列扩容的阻塞队列满了就需要让生产者线程在_In条件下等待。

    3、_mtx也不必解释阻塞队列作为生产者线程和消费者线程的交易场所即作为临界资源是一定需要锁保护的防止阻塞队列中的数据因时序性导致的数据紊乱。

    4、剩下的两个条件变量也是一定需要的它们的作用是控制生产者和消费者的行动顺序结合上文中的理论这里换句话说就是维护生产者和消费者的同步关系。

    然后说下BlockingQueue类内的成员函数如下。

    1、结合下图思考默认构造需要把BlockingQueue类内的2个成员条件变量_In、_Out和1个锁成员变量_mtx都通过对应的初始化函数初始化因为根据这些条件变量和锁变量的初始化规则它们都不是全局的变量只是局部的变量而局部的变量就只能通过这些初始化函数初始化。

    2、结合下图思考析构函数需要把BlockingQueue类内的2个成员条件变量_In、_Out和1个锁成员变量_mtx都通过下图的销毁函数销毁因为根据这些条件变量和锁变量的销毁规则因为这些变量只是局部变量所以在初始化时只能通过上图的3个函数而通过上图的3个函数初始化的变量就只能通过下图的3个销毁函数销毁。

    3、生产者线程函数push和消费者线程函数pop说一下这里push和pop严格意义来讲并不能称为生产者线程函数和消费者线程函数只是因为push函数在生产者线程函数productor中被调用了所以把push函数也称为了生产者线程函数pop被称为消费者线程函数的原因同理。push函数就是用于把push函数的参数传入队列queue中而pop函数就是用于把队列queue的队头front元素取出并拿到关于push和pop函数剩下的实现思路都在注释中详情请见下文中的代码。

    BlockQueue.h文件的整体代码

    结合上面理论BlockQueue.h的整体代码如下。

    #include<iostream>
    using namespace std;
    #include<queue>
    #include<pthread.h>
    
    template<class T>
    class BlockQueue
    {
    public:
        BlockQueue(int capacity = 5)
            :_capacity(capacity)
            {
                pthread_cond_init(&_Out,nullptr);
                pthread_cond_init(&_In,nullptr);
                pthread_mutex_init(&_mtx,nullptr);
            }
    
        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_Out);
            pthread_cond_destroy(&_In);
        }
    
        //生产者线程函数
        void push(const T& x)
        {
            pthread_mutex_lock(&_mtx);//访问临界区需要加锁
            
            //1、先检测当前的临界资源是否能满足条件,如果阻塞队列满了表示交易场所中货物已经满了此时不能让生产者继续生产需要进入if分支让生产者在条件变量中等待
            //2、pthread_cond_wait函数存在虚假唤醒的情况这是多线程编程中的一个重要概念。虚假唤醒指的是在等待条件变量时即使条件变量的条件尚未满足线程也可能会被唤醒
            //但这并不是因为条件实际上已经满足而是因为一些其他的线程调用了pthread_cond_signal或pthread_cond_broadcast函数或者由于一些信号中断了pthread_cond_wait函数的等待
            //既然存在伪唤醒的情况我们就要想办法杜绝这种情况为了正确使用pthread_cond_wait我们需要while循环等待条件变量而不仅仅是在if语句内等待以处理虚假唤醒,通过这样的方式
            //就能100%确定临界资源是否满足条件。
    
            //依据上面理论错误示例如下
            //if(_q.size()==_capacity)
            
            //正确示例如下
            while(_q.size()==_capacity)
            {
                //pthread_cond_wait:我们竟然是在临界区中wait此时当前线程函数是持有锁的!如果该线程去等待被挂起了锁该怎么办呢?毕竟如果不解锁其他线程函数就没法访问临界区了。
                //pthread_cond_wait第二个参数是一个锁当成功调用wait之后传入的锁会被自动释放!
                //当我被唤醒时我从哪里醒来呢??答案从哪里阻塞就从哪里被唤醒所以就是在wait函数内部被唤醒。
                //但有一个问题在wait函数内部被唤醒后此时还在临界区内访问临界区需要获取到锁但此时没有锁怎么办呢答案不必担心被唤醒后在wait函数内部剩下的逻辑中会帮我们拿到锁
                pthread_cond_wait(&_In,&_mtx);
            }
            //访问临界资源
            _q.push(x);
    
            //1、push是生产者线程函数。对于消费者来说如果消费者在消费时把交易场所即队列中的货物消费完了此时消费者就会被阻塞在条件变量的等待队列中需要被人唤醒。
            //被谁唤醒呢只能是生产者为什么呢只有交易场所还有货物即队列中还有有效数据消费者才能被唤醒继续消费其他人不知道交易场所是否还有货物但作为生产者而言我刚刚才在
            //临界区中生产了一个货物所以我知道交易场所即队列中是一定是有货物的因为即使没有由于我刚生产了一个所以也会变成有货物所以才说只能由生产者去唤醒消费者。        
            //2、说一下这个pthread_cond_signal函数可以在临界区内即可以位于加锁和解锁函数之间也可以在临界区外没有区别。比如在临界区内时注意如果在内则必须位于访问临界资源的代码下面此时生产者线程还没有释放锁所以即使消费者
            //被唤醒但因为消费者没有锁所以还是会在消费者线程函数中的pthread_cond_wait函数处卡住这是因为在pthread_cond_wait函数内部有加锁函数加锁函数在等待锁所以就卡住了
            //所以消费者无法在没有锁的状态下访问临界区所以不必担心误访问再比如在临界区外时在执行到pthread_cond_signal函数但还没进入函数内时此时生产者线程已经释放了锁但因为此时消费者还没有被唤醒所以
            //也抢占不到锁在调用pthread_cond_signal函数把消费者唤醒后消费者线程还要在pthread_cond_wait函数内部等待锁的逻辑处卡住等拿到锁后才能访问临界区所以消费者也无法在没有锁的状态下访问临界区所以不必担心误访问。
            //综上所述因为两种情况导致的结果是相同的所以才说pthread_cond_signal函数可以在临界区内也可以在临界区外没有区别。
            //3、说一下如果消费者在消费时没有把交易场所即队列中的货物消费完、没有被阻塞在条件变量的等待队列中、不需要被人唤醒那在生产者线程函数中即在当前注释所在的函数中调用pthread_cond_signal函数唤醒消费者时消费者会丢弃掉这个唤醒信息
            //所以即使消费者不需要被唤醒这里调用pthread_cond_signal函数唤醒消费者线程也不会出现什么问题。
            pthread_cond_signal(&_Out);
    
            pthread_mutex_unlock(&_mtx);//退出临界区需要解锁
        }
    
        //消费者线程函数
        void pop(T* x)//x是输出型参数让调用pop的人拿到数据
        {
            pthread_mutex_lock(&_mtx);//访问临界区需要加锁
    
            //1、先检测当前的临界资源是否能满足条件,如果阻塞队列为空表示交易场所中已经没有货物了此时不能让消费者继续消费需要进入if分支让消费者在条件变量中等待
            //2、pthread_cond_wait函数存在虚假唤醒的情况这是多线程编程中的一个重要概念。虚假唤醒指的是在等待条件变量时即使条件变量的条件尚未满足线程也可能会被唤醒
            //但这并不是因为条件实际上已经满足而是因为一些其他的线程调用了pthread_cond_signal或pthread_cond_broadcast函数或者由于一些信号中断了pthread_cond_wait函数的等待
            //既然存在伪唤醒的情况我们就要想办法杜绝这种情况为了正确使用pthread_cond_wait我们需要while循环等待条件变量而不仅仅是在if语句内等待以处理虚假唤醒,通过这样的方式
            //就能100%确定临界资源是否满足条件。
    
            //依据上面理论错误示例如下
            //if(_q.size()==0)
            
            //正确示例如下
            while(_q.size()==0)
            {
                //pthread_cond_wait:我们竟然是在临界区中wait此时当前线程函数是持有锁的!如果该线程去等待被挂起了锁该怎么办呢?毕竟如果不解锁其他线程函数就没法访问临界区了。
                //pthread_cond_wait第二个参数是一个锁当成功调用wait之后传入的锁会被自动释放!
                //当我被唤醒时我从哪里醒来呢??答案从哪里阻塞就从哪里被唤醒所以就是在wait函数内部被唤醒。
                //但有一个问题在wait函数内部被唤醒后此时还在临界区内访问临界区需要获取到锁但此时没有锁怎么办呢答案不必担心被唤醒后在wait函数内部剩下的逻辑中会帮我们拿到锁
                pthread_cond_wait(&_Out,&_mtx);
            }
            //访问临界资源
            *x=_q.front();
            _q.pop();
    
            pthread_mutex_unlock(&_mtx);//退出临界区需要解锁
    
            //1、pop是消费者线程函数。对于生产者来说如果生产者在生产时把交易场所即队列产满了此时生产者就会被阻塞在条件变量的等待队列中需要被人唤醒。
            //被谁唤醒呢只能是消费者为什么呢只有交易场所没有被产满即队列中还有剩余空间生产者才能被唤醒继续生产其他人不知道交易场所是否满了但作为消费者而言我刚刚才在
            //临界区中消费了一次所以我知道交易场所即队列中是一定没有被产满的因为即使满了由于我刚消费了一个所以也会变成不满所以才说只能由消费者去唤醒生产者。        
            //2、说一下这个pthread_cond_signal函数可以在临界区内即可以位于加锁和解锁函数之间也可以在临界区外没有区别。比如在临界区内时注意如果在内则必须位于访问临界资源的代码下面此时消费者线程还没有释放锁所以即使生产者
            //被唤醒但因为生产者没有锁所以还是会在生产者线程函数中的pthread_cond_wait函数处卡住这是因为在pthread_cond_wait函数内部有加锁函数加锁函数在等待锁所以就卡住了
            //所以生产者无法在没有锁的状态下访问临界区所以不必担心误访问再比如在临界区外时在执行到pthread_cond_signal函数但还没进入函数内时此时消费者线程已经释放了锁但因为此时生产者还没有被唤醒所以
            //也抢占不到锁在调用pthread_cond_signal函数把生产者唤醒后生产者线程还要在pthread_cond_wait函数内部等待锁的逻辑处卡住等拿到锁后才能访问临界区所以生产者也无法在没有锁的状态下访问临界区所以不必担心误访问。
            //综上所述因为两种情况导致的结果是相同的所以才说pthread_cond_signal函数可以在临界区内也可以在临界区外没有区别。
            //3、说一下如果生产者在生产时没有把交易场所即队列产满、没有被阻塞在条件变量的等待队列中、不需要被人唤醒那在消费者线程函数中即在当前注释所在的函数中调用pthread_cond_signal函数唤醒生产者时生产者会丢弃掉这个唤醒信息
            //所以即使生产者不需要被唤醒这里调用pthread_cond_signal函数唤醒生产者线程也不会出现什么问题。
            pthread_cond_signal(&_In);
        }
    
    private:    
        queue<T> _q;  //阻塞队列代表交易场所
        int _capacity; //阻塞队列的容量上限避免queue扩容
        pthread_mutex_t _mtx; //通过互斥锁保证队列安全
        pthread_cond_t _In; //条件变量用它表示阻塞队列还有空间剩余即还可以继续填放货物
        pthread_cond_t _Out; //条件变量用它表示阻塞队列中还存在有效数据即还可以继续消费货物
    };
    
    

    对上面代码的补充说明

    1、由于我们实现的是单生产者、单消费者的生产者消费者模型因此我们不需要维护生产者和生产者之间的关系也不需要维护消费者和消费者之间的关系我们只需要维护生产者和消费者之间的同步与互斥关系即可。

    2、将BlockingQueue当中存储的数据模板化方便以后需要时进行复用。

    3、这里设置BlockingQueue存储数据的上限为5、即_capacity为5当阻塞队列中存储了五个数据时生产者就不能进行生产了此时生产者就应该被阻塞。

    对【基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】的测试

    将上面模拟实现部分的ConProd.c文件编译并运行后得到的结果如下图。说一下生产和消费数据是从整形1开始的下图只是运行结果的一小个片段从8342开始只是因为CPU运行的太快了一下就刷到了8千多。在CPU分给生产者线程和消费者线程的时间片中在这个时间片内已经足够让生产者把阻塞队列、即交易中心产满数据阻塞队列被产满后分给消费者的时间片也足够让消费者线程把阻塞队列、即交易中心中已经产满的数据全部消费完所以生产和消费的顺序如下图是连续生产5个后再连续消费5个。

    如果我想步调一致即生产一个就立马消费一个可以通过sleep函数让生产者生产的速度和消费者消费的速度都慢下来但速度相同注意双方sleep的时间一定是相等的。只需要在上文的ConProd.c的代码的基础上下图1就是ConProd.c的代码加上如下图1的两个红框处的代码这样即可完成步调一致让生产者每秒生产1个数据、消费者每秒消费1个数据运行结果如下图2。

    图1如下。

    图2如下下图只是运行结果中的一小段片段。

    如果我不想步调一致比如想让生产者生产的速度比消费者消费的速度要快只需要在上文的ConProd.c的代码的基础上下图1就是ConProd.c的代码加上如下图1的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要快运行结果如下图2再比如想让生产者生产的速度比消费者消费的速度要慢只需要在上文的ConProd.c的代码的基础上下图3就是ConProd.c的代码加上如下图3的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要慢运行结果如下图4。

    图1如下。

    图2如下下图只是运行结果中的一小段片段。可以看到因为生产者线程没有sleep所以一下子就把阻塞队列即交易场所给产满了后序消费者慢悠悠的消费数据每秒只消费1个然后消费完毕后生产者又立马重新把阻塞队列产满后序轮询【消费者每秒消费1个数据后、生产者又立马把阻塞队列产满】这样的操作。

    图3如下。 

    图4如下下图只是运行结果中的一小段片段。可以看到因为消费者线程没有sleep而生产者线程sleep了、在慢悠悠的每秒生产1个数据所以生产者线程每生产1个数据消费者线程立马就能把这1个数据给消费完。

    如果我们想满足某一条件时再唤醒对应的生产者线程或消费者线程比如可以当阻塞队列当中存储的数据大于队列容量的一半时再唤醒消费者线程进行消费这只需要在上文的BlockQueue.h的代码的基础上下图1就是BlockQueue.h的代码加上如下图1的红框处的代码即可运行结果如下图2。

    图1如下。

    图2如下下图只是运行结果中的一小段片段。因为阻塞队列的容量是5所以按理来说只有生产者生产3个数据后消费者才能开始消费数据可以看到下图结果是符合预期的。说一下这里我们是通过sleep限制了生产者线程函数生产的速度了的但没有让消费者线程去sleep如果不对生产者线程函数生成的速度进行限制则看到的结果就是程序在刚开始运行时生产者线程就能连续生产5个数据把阻塞队列产满然后消费者线程又能立刻连续消费5个数据把阻塞队列中的数据清空然后又连续生产5、然后连续消费5、往后循环这个现象。

    对基于计算任务的生产者消费者模型的模拟实现

    为了方便理解下面我们以单生产者线程、单消费者线程为例进行实现。

    【基于计算任务的生产者消费者模型】说简单点就是在上文中讲解过的【基于阻塞队列BlockingQueue的生产者消费者模型】的基础上把生产和消费的数据从整形数字变成了函数因此【基于计算任务的生产者消费者模型】的模拟实现只需要在【基于阻塞队列BlockingQueue的生产者消费者模型】的基础上做出一点修改即可。

    哪些修改呢

    Task.h的整体代码

    修改1首先创建一个Task.h文件在里面实现一个Task类Task.h的整体代码如下。

    #pragma once
    #include<iostream>
    using namespace std;
    #include<functional>
    
    typedef function<int(int,int)> func_t;//C++11的包装器
    
    class Task
    {
    public:
        Task()
        {}
    
        ~Task()
        {}
       
        Task(int x,int y,func_t f):_x(x), _y(y), _f(f)
        {}
    
        int operator()()
        {
            return _f(_x,_y);
        }
        
        int _x;
        int _y;
        func_t _f;
    };

    修改2然后在ConProd.c文件中#include"Task.h"并把BlockQueue的类型模板参数从int类变成Task类然后还要把生产者线程函数productor和消费者线程函数consumer的代码做修改。

    修改生产者线程函数的思路为通过srand和rand函数生成两个随机数x和y然后创建一个myadd函数然后把这3个变量的值传给Task变量t让t调用默认构造完成初始化然后把Task变量t插入push进阻塞队列中。

    修改消费者线程函数的思路为创建一个Task变量t然后让t作为输出型参数把t作为实参传给阻塞队列的pop函数的形参pop函数结束后Task变量t就被完成赋值了也就拿到了阻塞队列中的数据即Task然后以【cout<<"consumer:"<<t._x<<'+'<<t._y<<'='<<t()<<endl】的格式打印这个数据t()是调用了Task类的成员函数operator()。

    ConProd.c文件的整体代码

    按照上面的思路将ConProd.c文件中的生产者线程函数和消费者线程函数的代码修改后ConProd.c文件的整体代码如下。

    #include"BlockQueue.h"
    #include<pthread.h>
    #include"Task.h"
    #include<unistd.h>//提供sleep函数
    #include<time.h>//提供srand、rand函数
    
    
    int myadd(int x,int y)
    {
        return x+y;
    }
    
    //消费者线程函数
    void* consumer(void*args)
    {
        BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
    
        //错误写法如下
        //while(bq->size()!=0)
        
        //正确写法如下为什么呢因为队列中没有数据时会被阻塞、等待生产者线程生产货物后继续消费而不是按照上面错误写法把交易场所中的货物消费完后就退出
        while(true)
        {
            Task t;
            bq->pop(&t);
            cout<<"consumer:"<<t._x<<'+'<<t._y<<'='<<t()<<endl;
        }
        return nullptr;
    }
    
    //生产者线程函数
    void* productor(void*args)
    {
        srand((uint16_t)time(nullptr));
        BlockQueue<Task> *bq = (BlockQueue<Task>*)args;    
        while(true)
        {
            int x=rand()%1000;
            int y=rand()%1000;
            cout<<"productor:"<<x<<'+'<<y<<'='<<"?"<<endl;
            Task t(x, y, myadd);
            bq->push(t);
        }
        return nullptr;
    }
    
    
    int main()
    {
        BlockQueue<Task> *bq = new BlockQueue<Task>;
    
        pthread_t c,p;//为线程ID
        pthread_create(&c,nullptr, consumer, (void*)bq);
        pthread_create(&p,nullptr, productor, (void*)bq);
    
        pthread_join(c,nullptr);
        pthread_join(p,nullptr);
    
        return 0;
    }

    3、注意BlockQueue.h文件的代码是不需要经过任何修改的直接把上文中讲解【对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】部分的BlockQueue.h文件拿来用即可。

    对【基于计算任务的生产者消费者模型的模拟实现】的测试

    将上面模拟实现部分的ConProd.c文件编译并运行后得到的结果如下图。说一下下图只是运行结果的一小个片段。在CPU分给生产者线程和消费者线程的时间片中在这个时间片内已经足够让生产者把阻塞队列、即交易中心产满数据阻塞队列被产满后分给消费者的时间片也足够让消费者线程把阻塞队列、即交易中心中已经产满的数据全部消费完所以生产和消费的顺序如下图是连续生产5个后再连续消费5个。

    如果我想步调一致即生产一个就立马消费一个可以通过sleep函数让生产者生产的速度和消费者消费的速度都慢下来但速度相同注意双方sleep的时间一定是相等的。只需要在上文的ConProd.c的代码的基础上下图1就是ConProd.c的代码加上如下图1的两个红框处的代码这样即可完成步调一致让生产者每秒生产1个数据、消费者每秒消费1个数据运行结果如下图2。

    图1如下。

    图2如下下图只是运行结果中的一小段片段。

    如果我不想步调一致比如想让生产者生产的速度比消费者消费的速度要快只需要在上文的ConProd.c的代码的基础上下图1就是ConProd.c的代码加上如下图1的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要快运行结果如下图2再比如想让生产者生产的速度比消费者消费的速度要慢只需要在上文的ConProd.c的代码的基础上下图3就是ConProd.c的代码加上如下图3的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要慢运行结果如下图4。

    图1如下。

    图2如下下图只是运行结果中的一小段片段。可以看到因为生产者线程没有sleep所以一下子就把阻塞队列即交易场所给产满了后序消费者慢悠悠的消费数据每秒只消费1个然后消费完毕后生产者又立马重新把阻塞队列产满后序轮询【消费者每秒消费1个数据后、生产者又立马把阻塞队列产满】这样的操作。

    图3如下。 

    图4如下下图只是运行结果中的一小段片段。可以看到因为消费者线程没有sleep而生产者线程sleep了、在慢悠悠的每秒生产1个数据所以生产者线程每生产1个数据消费者线程立马就能把这1个数据给消费完。

    多生产者多消费者模型的模拟实现以及对多生产者和多消费者模型的感悟

    在上文中不管是【对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】还是【对基于计算任务的生产者消费者模型的模拟实现】我们说过为了方便理解之前模拟实现它们时都是只创建一个生产者线程和只创建一个消费者线程也就是单生产者单消费者模型。但实际上不管是【对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】还是【对基于计算任务的生产者消费者模型的模拟实现】一般来说都应该是多生产者多消费者模型即应该同时创建多个生产者线程和创建多个消费者线程以上文中的【对基于计算任务的生产者消费者模型的模拟实现】为例进行修改将它从单生产者单消费者模型改成多生产者多消费者模型。

    哪些地方需要修改呢

    这里先插入一点和【哪些地方需要修改呢】的内容无关的内容然后再说明【哪些地方需要修改呢】的内容。

    对多生产者和多消费者模型的感悟先说一下这里咱们以知道了答案的视角下可以发现是没几个地方需要修改的对比修改前修改后的代码也就是多调用了几次创建线程的函数以此多创建几个线程然后多调用了几次等待线程的函数以此回收线程。为什么单生产单消费模型改成多生产多消费模型会这么容易呢或者说为什么从A【需要维护生产者和消费者的互斥关系、同步关系】变成B【在A的基础上现在又加上了需要维护生产者和生产者的互斥关系、消费者和消费者的互斥关系】会这么容易呢

    其原因是因为对于生产者和生产者之间、对于消费者和消费者之间、它们也受到互斥锁的限制、它们都是需要竞争锁才能进入临界区也就是阻塞队列完成各自任务的在多生产者和多消费者模型下的所有的线程不管是生产者还是消费者每次只能有一个线程进入临界区所以只靠互斥锁就能很好的维护生产者和生产者的互斥关系以及消费者和消费者的互斥关系。在单生产和单消费模型中我们的互斥锁其实就已经具备这些功能了即已经能很好的维护消费者和消费者的互斥关系、生产者和生产者的互斥关系了只是说我们在那时因为没有多个生产者、也没有多个消费者所以没有这种需求所以并不是说之前不可以维护【生产者和生产者的互斥关系、消费者和消费者的互斥关系】而只是不需要维护【生产者和生产者的互斥关系、消费者和消费者的互斥关系】如果我想做我是能做的既然我本来就能做所以从A【需要维护生产者和消费者的互斥关系、同步关系】变成B【在A的基础上现在又加上了需要维护生产者和生产者的互斥关系、消费者和消费者的互斥关系】才会这么容易。

    在上面插入了一些与正题无关的内容现在回到正题哪些地方需要修改呢

    1、如下图1需要将上文中【对基于计算任务的生产者消费者模型的模拟实现】部分中的ConProd.c文件中的main函数从左边的样子修改成右边的样子。然后如下图2需要将consumer和productor函数从左边的样子修改成右边的样子也就是把靠左的红框处的代码修改成靠右的红框处的代码。

    图1如下。

    图2如下。

    2、 注意Task.h文件的代码是不需要经过任何修改的直接把上文中讲解【对基于计算任务的生产者消费者模型的模拟实现】部分的Task.h文件拿来用即可。 

    3、注意BlockQueue.h文件的代码是不需要经过任何修改的直接把上文中讲解【对基于计算任务的生产者消费者模型的模拟实现】部分的BlockQueue.h文件拿来用即可。 

    ConProd.c文件的整体代码

    结合上面的思路进行修改后ConProd.c文件的整体代码如下。

    #include"BlockQueue.h"
    #include<pthread.h>
    #include"Task.h"
    #include<unistd.h>//提供sleep函数
    #include<time.h>//提供srand、rand函数
    
    
    int myadd(int x,int y)
    {
        return x+y;
    }
    
    //消费者线程函数
    void* consumer(void*args)
    {
        BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
    
        //错误写法如下
        //while(bq->size()!=0)
        
        //正确写法如下为什么呢因为队列中没有数据时会被阻塞、等待生产者线程生产货物后继续消费而不是按照上面错误写法把交易场所中的货物消费完后就退出
        while(true)
        {
            Task t;
            bq->pop(&t);     
            cout<<pthread_self()<<"consumer:"<<t._x<<'+'<<t._y<<'='<<t()<<endl;
        }
        return nullptr;
    }
    
    //生产者线程函数
    void* productor(void*args)
    {
        srand((uint16_t)time(nullptr));
        BlockQueue<Task> *bq = (BlockQueue<Task>*)args;    
        while(true)
        {
            int x=rand()%1000;
            int y=rand()%1000;
            cout<<pthread_self()<<"productor:"<<x<<'+'<<y<<'='<<"?"<<endl;
            Task t(x, y, myadd);
            bq->push(t);
        }
        return nullptr;
    }
    
    
    int main()
    {
        BlockQueue<Task> *bq = new BlockQueue<Task>;
    
        pthread_t c[2],p[2];//为线程ID
        pthread_create(c,nullptr, consumer, (void*)bq);
        pthread_create(c+1,nullptr, consumer, (void*)bq);
        pthread_create(p,nullptr, productor, (void*)bq);
        pthread_create(p+1,nullptr, productor, (void*)bq);
    
        pthread_join(c[0],nullptr);
        pthread_join(c[1],nullptr);
        pthread_join(p[0],nullptr);
        pthread_join(p[1],nullptr);
    
        return 0;
    }

    对【多生产者多消费者模型的模拟实现】的测试

    将上面模拟实现部分的ConProd.c文件编译并运行后得到的结果如下图。说一下下图只是运行结果的一小个片段。在CPU分给生产者线程和消费者线程的时间片中在这个时间片内已经足够让生产者把阻塞队列、即交易中心产满数据阻塞队列被产满后分给消费者的时间片也足够让消费者线程把阻塞队列、即交易中心中已经产满的数据全部消费完所以生产和消费的顺序如下图是连续生产5个后再连续消费5个。

    而且可以看到是有不同的线程在生产的每条语句开头部分的长串数字就是各自线程通过pthread_self()函数打印出的自己的线程ID也是有不同的线程的消费的这就是多生产者多消费者模型了。

    说一下、这里对生产者消费者步调一致和非步调一致的测试就不测了类似的内容在上文中已经有过两次测试了详情见上文对【基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】的测试部分和对【基于计算任务的生产者消费者模型的模拟实现】的测试部分。

    【单生产者单消费者模型】和【多生产者多消费者模型】的应用场景或者说意义包括如何选择模型比较合适

    问题在进程中有多个生产线程并且还有多个消费线程时因为有互斥锁的存在所以不管一个线程是消费者线程还是生产者线程只要是一个线程想要访问临界区即阻塞队列或者说交易场所就都得持有锁而当一个线程持有锁后其他所有线程都无法持有该锁也就是说在同一时间只能有一个线程访问临界区那这样貌似和单生产单消费的模型没有任何区别那多生产多消费的意义在哪呢请举例说明。

    答案如下不要肤浅地认为把任务Task或者说数据在临界区可以把它形象的称为交易场所存放或者取走或者说不要肤浅地认为线程访问临界区资源就是在生产和消费。生产任务本身和拿到任务后处理才是最消耗时间的把生产出的任务Task放到临界区和把临界区的任务拿走即访问临界区资源反而是最简单的。虽然多生产多消费的场景下和单生产单消费一样在同一时间也只有一个生产线程可以访问临界区但若干生产线程在访问临界区前即若干生产线程生产任务时是可以有多个生产线程并发的生产各自的任务的只是任务生产完毕后将任务送到临界区时在同一时间只能有一个生产线程可以访问临界区即把任务Task放进阻塞队列。同理虽然多生产多消费的场景下和单生产单消费一样在同一时间也只能有一个消费线程可以访问临界区即把任务Task从阻塞队列中接取出来但在访问完临界区后即若干消费者线程拿到临界区的任务后是可以有多个消费者线程并发的执行各自的任务的。这才是多生产多消费的价值。

    举个例子说明多生产者多消费者模型的意义。在单生产者单消费者模型下消费者线程从阻塞队列即交易场所中接取到一个任务后需要等待键盘或者网络资源就绪如果这个资源一直不就绪那么这个消费者线程就一直卡着与此同时生产者线程正有条不紊的持续生产任务并在拿到锁后把任务放进阻塞队列中此时进程中总共两个线程即一个生产者线程和一个消费者线程因为消费者线程在等待键盘或者网络资源处于阻塞状态所以此时没有线程和生产者线程抢互斥锁生产者线程一直能抢占锁成功等到阻塞队列被放满了任务、再也放不下后消费者线程还是在等待键盘或者网络资源就绪还没有把最初的任务处理完此时生产者线程就呼叫消费者线程说“消费者线程啊你快点来接取任务吧”但消费者线程连当前任务都没有处理完所以更不可能再去接任务所以这就是单生产者单消费者模型的缺点此时如果有多个消费者线程其他消费者线程就能帮忙缓解压力各自去接取任务后处理任务。所以当有类似的情景此时就应该使用多生产者多消费者模型。

    ————end————

    理解了上几段中说明的多生产者多消费者模型的意义后可以发现上文中模拟实现多生产者多消费者模型时虽然这个模拟实现是正确的但它对比我们模拟实现的单生产者单消费者模型其实是体现不出来优势的因为在上文模拟实现的多生产者和多消费者模型中每个生产者线程和消费者线程处理的任务都太简单了生产者生产任务的流程就是创建一个Task变量t通过两个整形值和一个函数初始化 t 后就将t插入push进队列queue中消费者消费任务的流程就是将一个个Task变量从队列queue从取出然后调用一下Task变量中的operator()函数这些操作都不需要什么时间成本可能几纳秒就执行完毕了所以不会出现某个生产者或者消费者线程处于忙碌的情况所以也就不太需要其他线程来帮忙分担压力只需要一个生产者线程和一个消费者线程就足够流畅地运作了所以才说上文中模拟实现多生产者多消费者模型时虽然这个模拟实现是正确的但它对比我们模拟实现的单生产者单消费者模型其实是体现不出来优势的。

    如何选择模型比较合适

    从上一段我们也能得出一个启示要在合适的情景下选择合适的模型不要无脑地使用多生产者多消费者模型有时单生产者单消费者模型其实更好用。选择模型的依据为如果生产者和消费者线程在生产或者消费任务时可能很耗时间注意不要将【生产或者消费任务】这些动作和【把任务放进阻塞队列或者从阻塞队列中取出】混淆它们是不一样的则使用多生产者多消费者模型如果生产者和消费者线程在生产或者消费任务时所花的时间很短则使用单生产者单消费者模型。

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

    “生产消费者模型的介绍以及其的模拟实现” 的相关文章