跳转至

生产者消费者模型(Producer-Consumer Model)

约 5201 个字 530 行代码 3 张图片 预计阅读时间 24 分钟

基本介绍

生产者消费者模型(或称生产消费模型)是一种常见的多线程并发控制模型,用于解决共享资源冲突问题。在这个模型中,存在着两种角色:

  1. 生产者:表示数据的提供方或者数据的来源
  2. 消费者:表示数据的接收方或者数据的处理

基本过程是在一个程序中生产和消费产品,两个行为分别对应着两个线程,一个线程负责生产,一个线程负责消费,并且消费模式为:生产一个产品紧接着消费一个产品,不可以产生同时生产和同时消费,基本示意图如下:

因为存在着两个角色,所以本质上存在着3种关系:

  1. 生产者与生产者:即同一任务的多个线程,因为彼此都执行着同一个任务(将数据放置到仓库中),所以彼此是互斥的
  2. 消费者与消费者:即同一任务的多个线程,因为彼此都执行着同一个任务(从仓库中取出数据),所以彼此是互斥的
  3. 生产者和消费者:执行不同任务的两个线程,当任意一个线程正在访问仓库时,另外一个线程不可以访问仓库,此时是互斥的。另外,为了保证消费者和生产者交替工作,还需要保证二者是同步的

综上,生产消费模型本质遵循着「321」原则:

  1. 3种角色关系
  2. 2个角色
  3. 1个交易场所

在下面的实现中,先实现阻塞队列版本,再实现循环队列版本

阻塞队列版本介绍

生产消费模型就是通过一个容器来解决生产者和消费者的强耦合问题。具体来说就是生产者和消费者彼此之间不直接通讯,而通过阻塞队列(即交易场所)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔进阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

在本次实现中,首先考虑实现只有一个生产者和只有一个消费者的模式(单生产者单消费者),再进一步扩展到多生产者和多消费者的模式

(阻塞队列)基于线程同步与互斥实现基于单生产者单消费者的生产消费模型

设计阻塞队列

首先设计生产者和消费者的交易场所,对于这个阻塞队列来说,其要实现的功能实际上主要就只有两个,一个是生产者放入数据,另外一个是消费者读取数据。本次考虑底层使用到C++的队列结构作为阻塞队列的底层结构,所以基本代码结构如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class BlockQueue
{
public:
    BlockQueue()
    {
    }

    void pushData(int data)
    {
        _bq.push(data);
    }

    void popData(int *out)
    {
        *out = _bq.front();
        _bq.pop();
    }

    ~BlockQueue()
    {
    }

private:
    std::queue<int> _bq;
};

接着,既然是交易场所,那么肯定有大小限制,所以阻塞队列一定有最大容量,但是在C++中,队列结构会自动扩容,所以需要一个成员_maxSize表示当前阻塞队列的最大容量,至于这个容量为多少,可以设计为两种:

  1. 给定缺省值,并且支持用户自行设定
  2. 直接写死

本次考虑使用第一种方式。既然有了最大容量和最小容量(即没有任何数据),那么在插入和获取数据时一定要保证插入时阻塞队列不为满以及获取时阻塞队列不为空,所以还需要提供两个接口分别判断当前阻塞队列是否是满状态或者空状态,因为这两个接口只需要类内使用,所以可以考虑直接作为类内成员函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private:
    bool isFull()
    {
        return _bq.size() == _maxSize;
    }

    bool isEmpty()
    {
        return _bq.empty();
    }

对应地需要修改插入和获取函数。对于这两个函数,考虑下面的思路:

  1. 插入数据函数:如果阻塞队列为满,说明此时不允许插入,那么一旦有线程调用插入函数判断当前阻塞队列已满就可以考虑进入等待,直到下一次阻塞队列为空时再唤醒其中一个生产线程。这种思路的好处是,一旦插入线程被唤醒,那么此时阻塞队列一定是至少有一个空位置插入数据的,而因为只唤醒了一个生产线程,所以也只会插入一个数据,空间是完全足够的,此时当前线程离开判满的分支后直接插入数据即可
  2. 获取数据函数:如果阻塞队列为空,说明此时不允许获取数据,那么一旦有线程调用获取函数判断当前阻塞队列已经为空就可以考虑进入等待,直到下一次阻塞队列至少有一个数据时再唤醒其中一个消费线程。这种思路的好处是,一旦获取线程被唤醒,那么此时阻塞队列一定是至少有一个数据,而因为只唤醒了一个插入线程,所以也只会获取到一个数据,所以不会出现为空时获取的情况

根据上面的思路可以看出,需要一把锁和两个条件变量:

  1. 一把锁:控制3种关系都实现互斥,保证阻塞队列同一时刻一定只有一个线程在访问
  2. 两个条件变量:控制生产者和消费者。当生产者插入数据时发现阻塞队列已满,进入等待,一旦消费者获取数据,消费者根据生产者的条件变量唤醒生产者,代表生产者此时可以生产数据;当消费者读取数据时发现阻塞队列为空,进入等待,一旦生产者生产数据,生产者根据消费者的条件变量唤醒消费者,代表消费者此时可以获取数据

另外,为了确保代码的健壮性,可以考虑唤醒前先确定是否对应条件变量下有线程正在等待,可以通过设置两个计数器分别表示当前正在等待的生产者线程个数和消费者线程个数

所以修改后的代码如下:

Note

下面的代码使用到前面对互斥锁条件变量的封装

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class BlockQueue
{
private:
    bool isFull()
    {
        return _bq.size() == _maxSize;
    }

    bool isEmpty()
    {
        return _bq.empty();
    }

public:
    BlockQueue(size_t maxSize = 5)
        : _maxSize(maxSize), _p_wait_num(0), _c_wait_num(0)
    {
    }

    void pushData(int data)
    {
        // 插入数据前抢锁
        MutexGuard mutex(_lock);
        if (isFull())
        {
            _p_wait_num++;
            // 满时生产者等待
            _p_cond.wait(_lock);
            _p_wait_num--;
        }

        // 此时一定有空位置
        _bq.push(data);

        // 通知消费者可以获取数据
        if (_c_wait_num)
            _c_cond.notify();
    }

    void popData(int *out)
    {
        // 获取数据前抢锁
        MutexGuard mutex(_lock);
        if (isEmpty())
        {
            _c_wait_num++;
            // 空则等待
            _c_cond.wait(_lock);
            _c_wait_num--;
        }

        // 此时一定有数据
        *out = _bq.front();
        _bq.pop();

        // 通知生产者可以生成数据
        if (_p_wait_num)
            _p_cond.notify();
    }

    ~BlockQueue()
    {
    }

private:
    std::queue<int> _bq;
    size_t _maxSize;    // 最大容量
    Mutex _lock;        // 一把锁
    Condition _p_cond;  // 生产者条件变量
    Condition _c_cond;  // 消费者条件变量
    size_t _p_wait_num; // 生产者等待数量
    size_t _c_wait_num; // 消费者等待数量
};

设计主逻辑

在主逻辑中,首先需要让两个线程看到阻塞队列,所以可以考虑将阻塞队列定义为全局变量,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#include "BlockQueue.hpp"

using namespace BlockQueueModule;

// 阻塞队列
BlockQueue bq;

int main()
{

    return 0;
}

接着,在主函数中创建一个生产者线程和一个消费者线程,这两个线程分别执行自己的生成函数和消费函数,再启动线程和等待线程:

Note

下面的代码使用到前面封装的线程库

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void producer()
{
}

void consumer()
{
}

int main()
{
    // 创建生产者线程
    Thread pro(producer);

    // 创建消费者线程
    Thread con(consumer);

    // 启动两个线程
    pro.start();
    con.start();

    // 等待两个线程
    pro.join();
    con.join();

    return 0;
}

在生产者的执行函数producer中,考虑让生产者持续生产数据,并且为了能看到生成的数据差异,可以考虑每一次生产完数据后改变该数据;在消费者的执行函数consumer中,考虑让消费者持续读取数据,代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void producer()
{
    int data = 10;
    while (true)
    {
        sleep(1);

        bq.pushData(data);
        std::cout << "生产者:" << pthread_self() % 1000 << "生产了一个数据:" << data << std::endl;

        data += 10;
    }
}

void consumer()
{
    while (true)
    {
        sleep(1);

        int data = 0;
        bq.popData(&data);

        std::cout << "消费者:" << pthread_self() % 1000 << "获取到一个数据:" << data << std::endl;
    }
}

编译运行上面的代码即可看到生产者先生产数据,消费者再拿数据,二者有序进行:

(阻塞队列)基于线程同步与互斥实现基于多生产者多消费者的生产消费模型

简易版多生产者多消费者模型+潜在问题解决

完成单生产单消费的生产消费模型后,推广到多生产多消费模型最容易考虑到的思路就是直接添加线程个数,并且在唤醒线程时不是唤醒其中一个线程而是唤醒所有生产或者消费线程进行生产或者消费,在这种情况下实现多生产多消费下的生产消费模型对应的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ...
int main()
{
    // 创建生产者线程
    Thread pro1(producer);
    Thread pro2(producer);
    Thread pro3(producer);

    // 创建消费者线程
    Thread con1(consumer);
    Thread con2(consumer);
    Thread con3(consumer);

    // 启动多个线程
    pro1.start();
    pro2.start();
    pro3.start();
    con1.start();
    con2.start();
    con3.start();

    // 等待多个线程
    pro1.join();
    pro2.join();
    pro3.join();
    con1.join();
    con2.join();
    con3.join();

    return 0;
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void pushData(int data)
{
    // 插入数据前抢锁
    MutexGuard mutex(_lock);
    if (isFull())
    {
        _p_wait_num++;
        // 满时生产者等待
        _p_cond.wait(_lock);
        _p_wait_num--;
    }

    // 此时一定有空位置
    _bq.push(data);

    // 通知消费者可以获取数据
    if (_c_wait_num)
        _c_cond.notify();
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void popData(int *out)
{
    // 获取数据前抢锁
    MutexGuard mutex(_lock);
    if (isEmpty())
    {
        _c_wait_num++;
        // 空则等待
        _c_cond.wait(_lock);
        _c_wait_num--;
    }

    // 此时一定有数据
    *out = _bq.front();
    _bq.pop();

    // 通知生产者可以生成数据
    if (_p_wait_num)
        _p_cond.notify();
}

运行上面的代码会发现貌似没有什么问题,生产者生产一个数据,消费者消费一个数据,但是其实上面的代码还潜藏着一个问题:如果插入函数或者获取函数的wait函数并没有成功让线程进行等待,但是又释放了线程的锁。此时就会出现一种伪唤醒的状态,即本应该等待的线程却被唤醒了,并且这个线程因为没有锁继续向后执行,此时如果已经有一个线程被正常唤醒并且阻塞队列又刚好只有一个空间就会出现问题

为了解决上面提到的问题,可以考虑将if改成while,此时尽管会存在等待问题,因为下一次还是会进行条件判断,如果条件成立依旧会进入等待逻辑。修改后的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void pushData(int data)
{
    // 插入数据前抢锁
    MutexGuard mutex(_lock);
    while (isFull())
    {
        _p_wait_num++;
        // 满时生产者等待
        _p_cond.wait(_lock);
        _p_wait_num--;
    }

    // 此时一定有空位置
    _bq.push(data);

    // 通知消费者可以获取数据
    if (_c_wait_num)
        _c_cond.notify();
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void popData(int *out)
{
    // 获取数据前抢锁
    MutexGuard mutex(_lock);
    while (isEmpty())
    {
        _c_wait_num++;
        // 空则等待
        _c_cond.wait(_lock);
        _c_wait_num--;
    }

    // 此时一定有数据
    *out = _bq.front();
    _bq.pop();

    // 通知生产者可以生成数据
    if (_p_wait_num)
        _p_cond.notify();
}

解决多个线程访问同一个数据的问题

在上面的多生产多消费的生产消费模型中,存在的一个问题就是「多个生产者生产同一个数据或者多个消费者消费同一个数据」,这个问题本质就是因为生产线程的执行函数中data是一个局部变量,因为每个线程都会有自己的栈帧空间,所以data本质在多个线程中是独立的,所以起始时都会从10开始,导致每一次都是同一个值

解决这个问题也很简单,只需要将data变量作为全局变量即可:

C++
1
2
3
4
5
6
int data = 0;

void producer()
{
    // ...
}

循环队列版本介绍

循环队列版本的生产消费模型本质就是利用了信号量实现并发处理,在前面的阻塞队列版本,不论是单生产、消费线程版本还是多生产、消费线程版本,都无法做到生产的同时消费,但是实际上如果可以支持边生产边消费且生产一定在消费之前就可以一定程度上提高整个模型的并发性,从而提高效率,而循环队列版本的生产消费模型就可以解决这个问题

所谓的循环队列就是一种首尾相连的队列结构,在算法:栈和队列基础练习中提到过循环队列的设计,其中最主要关注的问题就是为空和为满如何区别以及如何处理环形。实际上,在本次实现中,只需要考虑如何处理环形即可,对于为空和为满只需要通过信号量进行控制,下面是具体分析:

因为生产消费模型中存在两个对象,为了使二者可以同时进行就必须要两个指针,定义为produceconsume,刚开始时,produceconsume指针指向同一个位置,如果是在设计循环队列中,此时需要考虑produceconsume能否指向同一位置,如果指向同一个位置如何区分为空和为满,而在有信号量之后,假设信号量的值表示当前循环队列还剩余的空间个数,考虑一种思路:

  1. 生产者执行生产操作,此时就申请生产者信号量,代表P操作。如果循环队列没有满(生产者信号量不为0),那么就可以正常生产数据并更新produce指针向后移动,否则就可以唤醒消费者进行消费,而唤醒消费者的行为就是V操作,因为要增加消费者的信号量
  2. 消费者执行消费操作,此时就申请消费者信号量,代表P操作。如果循环队列不为空(消费者信号量不为0),那么就可以正常消费数据并更新consume指针向后移动,否则就需要唤醒生产者进行生产,而唤醒生产者的行为就是V操作,因为要增加生产者的信号量

这样,生产者和消费者就是交替执行,并且初始化时,只要保证生产者的信号量不为0,消费者的信号量为0,就可以保证不需要进行判断空和判断满的操作,因为如果生产者先运行,那么循环队列为空时,生产者一定可以生产数据,如果循环队列为满,只有消费者先消费了才会唤醒生产者生产,并且因为是消费者唤醒生产者,让生产者信号量+1,所以一定会有一个空间。在这整个过程中,消费者和生产者的位置情况一定只有两种:

  1. 二者处于同一个位置:为空或者为满
  2. 二者不处于同一个位置:生产者一定在消费者之前,并且二者之间的距离绝对不会形成一个新循环队列(即已经更新数据的位置不会在没被消费之前覆盖)

根据上面的思路,可以抽象出生产者和消费者的逻辑如下:

Text Only
1
2
3
4
5
6
P(生产者信号量)

向循环队列插入数据
更新下标

V(消费者信号量)
Text Only
1
2
3
4
5
6
P(消费者信号量)

向循环队列插入数据
更新下标

V(生产者信号量)

(循环队列)基于信号量实现基于单生产者单消费者的生产消费模型

设计循环队列

本次考虑循环队列底层使用数组实现,所以可以考虑使用vector容器,但是同样因为vector会自动扩容,所以需要一个表示最大容量的成员,并且循环队列需要提供插入数据和获取数据的接口,所以循环队列基本结构如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class CycleQueue
{
public:
    CycleQueue(int val = 5)
        : _maxSize(val), _cq(val)
    {
    }

    void pushData(const int &data)
    {
    }

    void popData(int *data)
    {
    }

    ~CycleQueue()
    {
    }

private:
    std::vector<int> _cq;
    size_t _maxSize;
};

Note

需要注意,在上面的构造函数中,一定要对数组大小进行初始化,否则数组当前是未开辟空间的状态,直接使用下标会报越界错误

接着,根据上面的分析,需要一个生产者指针produce和一个消费者指针consume,二者均初始化为0:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CycleQueue(int val = 5)
    // ...
    , _produce(0)
    , _consume(0)
{
}
// ...
private:
    // ...
    int _produce;    // 生产者指针
    int _consume;    // 消费者指针

因为需要通过信号量控制生产者和消费者交替运行,所以需要两个信号量,并且生产者一定要先于消费者运行,而生产者信号量表示当前循环队列的空间个数,消费者信号量表示是否存在有效数据,所以生产者信号量初始化为循环队列的容量,表示初始时生产者可以生产数据,消费者信号量初始化为0,表示初始时消费者不可以获取数据。代码如下:

Note

需要注意,下面的代码使用到上一节封装的信号量

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CycleQueue(int val = 5)
    // ...
    ,_p_sem(val)
    , _c_sem(0)
{
}
// ...
private:
    // ...
    Sem _c_sem;      // 消费者信号量
    Sem _p_sem;      // 生产者信号量

接下来考虑设计插入数据的函数和获取数据的函数

对于插入数据的函数来说,当生产者调用该函数时,首先需要申请信号量,接着一旦申请成功,就可以考虑插入数据并更新下标,一系列操作完成之后就可以更新生产者的信号量,对于获取数据的函数也是同理:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void pushData(const int &data)
{
    // 申请信号量
    _p_sem.wait();

    // 申请成功后插入数据
    _cq[_produce] = data;

    // 更新下标
    _produce++;
    _produce %= _maxSize;

    // 更新消费者信号量
    _c_sem.signal();
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void popData(int *data)
{
    // 申请信号量
    _c_sem.wait();

    // 申请成功后获取数据
    *data = _cq[_consume];

    // 更新下标
    _consume++;
    _consume %= _maxSize;

    // 更新生产者信号量
    _p_sem.signal();
}

设计主逻辑

为了让两个线程看到同一个循环队列,考虑将循环队列对象定义为全局对象:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#include "CycleQueue.hpp"

using namespace CycleQueueModule;

// 循环队列
CycleQueue cq;

int main()
{

    return 0;
}

分别创建一个生产者线程和一个消费者线程:

Note

下面的代码使用到前面封装的线程库

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// #include "BlockQueue.hpp"
#include "CycleQueue.hpp"
#include "thread.hpp"

using namespace ThreadModule;
using namespace CycleQueueModule;

// 循环队列
CycleQueue cq;

void producer()
{
}

void consumer()
{
}

int main()
{
    // 创建生产者线程
    Thread pro1(producer);

    // 创建消费者线程
    Thread con1(consumer);

    // 启动线程
    pro1.start();
    con1.start();

    // 等待线程结束
    pro1.join();
    con1.join();

    return 0;
}

最后,设计生产者线程的执行函数和消费者线程的执行函数,设计思路与阻塞队列部分一致,此处不再赘述:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void producer()
{
    while (true)
    {
        sleep(1);

        cq.pushData(data);
        std::cout << "生产者放入一个数据:" << data << std::endl;
        data += 10;
    }
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void consumer()
{
    while (true)
    {
        sleep(1);

        int temp = 0;

        cq.popData(&temp);

        std::cout << "消费者获取一个数据:" << temp << std::endl;
    }
}

编译运行上面的代码就可以看到运行效果为生产者先生产,随后生产者生产一个数据,消费者拿取一个数据:

(循环队列)基于信号量实现基于多生产者多消费者的生产消费模型

在上一节中已经完成了基于单生产者单消费者的生产消费模型,下面就可以考虑如何将其修改为多生产者多消费者

单生产者和单消费者以及多生产者多消费者的共性就是生产者和消费者之间是互斥与同步的关系,从这一方面来看单生产者单消费者的代码也适用于多生产者多消费者,但是在单生产者单消费者的生产消费模型中,如果只有一个生产者,因为在初始化生产者信号量时使用的是循环队列的大小作为资源数量,所以当一个生产者申请了生产者信号量,那么此时生产者信号量就是_maxSize-1,在申请信号量时因为只会有一个生产者而不会有其他生产者申请这个信号量,所以此时不会出现任何问题,但是如果是多个生产者,那么此时一旦生产者信号量足够就会导致多个生产者进入临界区,从而导致数据出现问题。同样,对于多个消费者线程也是如此

所以本质问题就是多生产者多消费者的生产模型中还没有处理同种线程之间的互斥关系

清楚了单生产者单消费者的生产模型存在的问题后,下面针对这个问题提出一种解决方案:在插入数据以及获取数据函数中的申请信号量之后加锁,在更新另外一种线程的信号量之前解锁:

Note

下面的代码使用到前面对互斥锁

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void pushData(const int &data)
{
    // 申请信号量
    _p_sem.wait();

    {
        MutexGuard guard(_p_lock);
        // 申请成功后插入数据
        _cq[_produce] = data;

        // 更新下标
        _produce++;
        _produce %= _maxSize;
    }
    // 更新消费者信号量
    _c_sem.signal();
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void popData(int *data)
{
    // 申请信号量
    _c_sem.wait();

    {
        MutexGuard guard(_c_lock);
        // 申请成功后获取数据
        *data = _cq[_consume];

        // 更新下标
        _consume++;
        _consume %= _maxSize;
    }

    // 更新生产者信号量
    _p_sem.signal();
}

下面还有一个问题:为什么加锁要在申请信号量之后而不是申请信号量之前

这里涉及到一个效率问题,如果是在申请信号量之前加锁,那么就会出现多个生产者或者多个消费者进入对应的执行函数后需要先抢锁,如果抢到锁的就会紧接着申请信号量,但是没有抢到锁的就需要继续等待锁,此时这些没有抢到锁的既没有锁也没有信号量,所以下一次就算抢到锁了还要申请信号量。但若是先获取信号量,那么可以保证多个线程只要信号量足够就可以准备进入临界区,下一次只要有锁就可以直接访问临界区,这样在一定程度上保证了多个线程情况下的并发性

(循环队列+泛型)基于信号量实现基于多生产者多消费者的生产消费模型

前面的生产消费模型只是针对数据为int,如果数据为其他类型就会存在类型不匹配的问题,所以为了支持不同的类型,可以考虑使用泛型,以循环队列最终版本为例,将其修改为泛型版本后的整体代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
template <class T>
class CycleQueue
{
public:
    CycleQueue(int val = 5)
        : _maxSize(val), _cq(val), _p_sem(val), _c_sem(0), _produce(0), _consume(0)
    {
    }

    void pushData(const T &data)
    {
        // 申请信号量
        _p_sem.wait();

        {
            MutexGuard guard(_p_lock);
            // 申请成功后插入数据
            _cq[_produce] = data;

            // 更新下标
            _produce++;
            _produce %= _maxSize;
        }
        // 更新消费者信号量
        _c_sem.signal();
    }

    void popData(T *data)
    {
        // 申请信号量
        _c_sem.wait();

        {
            MutexGuard guard(_c_lock);
            // 申请成功后获取数据
            *data = _cq[_consume];

            // 更新下标
            _consume++;
            _consume %= _maxSize;
        }

        // 更新生产者信号量
        _p_sem.signal();
    }

    ~CycleQueue()
    {
    }

private:
    std::vector<T> _cq;
    size_t _maxSize; // 循环队列最大容量
    int _produce;    // 生产者指针
    int _consume;    // 消费者指针
    Sem _c_sem;      // 消费者信号量
    Sem _p_sem;      // 生产者信号量
    Mutex _c_lock;   // 消费者锁
    Mutex _p_lock;   // 生产者锁
};

使用泛型后就可以使用任意类型包括自定义类型作为数据

阻塞队列版本与循环队列版本比较

在阻塞队列版本中,插入数据函数和获取数据函数共用一把互斥锁,此时就导致一次只能一种线程进入阻塞队列,所以结果就是要么生产不消费,要么消费不生产,这在效率上会比较低下,但是在循环队列版本中,因为生产线程和消费线程各自使用各自的锁,所以就保证生产的同时也可以进行消费