溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

生產(chǎn)者與消費(fèi)者模式(線程的同步與互斥)

發(fā)布時(shí)間:2020-07-05 11:52:21 來(lái)源:網(wǎng)絡(luò) 閱讀:1141 作者:小止1995 欄目:編程語(yǔ)言

死鎖產(chǎn)生的四個(gè)條件:

1、互斥使用(資源獨(dú)占) 

一個(gè)資源每次只能給一個(gè)進(jìn)程使用 

.2、不可強(qiáng)占(不可剝奪) 

資源申請(qǐng)者不能強(qiáng)行的從資源占有者手中奪取資源,資源只能由占有者自愿釋放 

.3、請(qǐng)求和保持(部分分配,占有申請(qǐng)) 

一個(gè)進(jìn)程在申請(qǐng)新的資源的同時(shí)保持對(duì)原有資源的占有(只有這樣才是動(dòng)態(tài)申請(qǐng),動(dòng)態(tài)分配) 

.4、循環(huán)等待 

存在一個(gè)進(jìn)程等待隊(duì)列 

{P1 , P2 , … , Pn}, 

其中P1等待P2占有的資源,P2等待P3占有的資源,…,Pn等待P1占有的資源,形成一個(gè)進(jìn)程等待環(huán)路


生產(chǎn)者:生產(chǎn)數(shù)據(jù)

消費(fèi)者:消費(fèi)數(shù)據(jù)

提供場(chǎng)所:緩沖區(qū),eg:超市

生產(chǎn)者消費(fèi)者特點(diǎn):三種關(guān)系,兩類人,一個(gè)場(chǎng)所

三種關(guān)系指的是:生產(chǎn)者與生產(chǎn)者之間是互斥關(guān)系

        消費(fèi)者與消費(fèi)者之間是互斥關(guān)系

        生產(chǎn)者與消費(fèi)者之間是同步與互斥關(guān)系

兩類人:生產(chǎn)者,消費(fèi)者

一個(gè)場(chǎng)所:存儲(chǔ)數(shù)據(jù)(此處用帶頭單鏈表實(shí)現(xiàn))

單生產(chǎn)者單消費(fèi)者模式:此例取數(shù)據(jù)方式為L(zhǎng)IFO后進(jìn)先出,所取數(shù)據(jù)為最后一個(gè)生產(chǎn)的數(shù)據(jù)(也可選擇所取數(shù)據(jù)為最先生產(chǎn)的數(shù)據(jù),可自行選擇)

互斥鎖相關(guān)函數(shù):

#include <pthread.h>

int pthread_mutex_destroy(pthread_mutex_t *mutex);

int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;


int pthread_mutex_lock(pthread_mutex_t *mutex);

int pthread_mutex_trylock(pthread_mutex_t *mutex);//非阻塞形式獲取鎖

int pthread_mutex_unlock(pthread_mutex_t *mutex);

//1.使用互斥鎖實(shí)現(xiàn)
#include<stdio.h>
#include<malloc.h>
#include<pthread.h>
typedef int  _dataType_;
typedef int*  _dataType_p_;
typedef struct _node
{
    _dataType_ data;
    struct _node* next;
}node,*nodep,**nodepp;
nodep head=NULL;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
nodep  buyNode(_dataType_ val)
{
    nodep tmp=(nodep)malloc(sizeof(node));
    if(tmp!=NULL)
    {
        tmp->data=val;
        tmp->next=NULL;
        return tmp;
    }
    return NULL;
}
void init(nodepp head)
{
    *head=buyNode(0);
}
void push_list(nodep head,_dataType_ val)
{
    nodep tmp=buyNode(val);
    tmp->next=head->next;
    head->next=tmp;
}
int pop_list(nodep head,_dataType_p_ pval)
{
    if(head->next==NULL)
        return -1;
    nodep del=head->next;
    *pval=del->data;
    head->next=del->next;
    free(del);
    return 0;
}
void* product(void* arg)
{
    _dataType_  i=0;
    while(1)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        push_list(head,i++);
        pthread_mutex_unlock(&mutex);
    }
    pthread_exit((void*)1);
}
void* consumer(void* arg)
{
    _dataType_ val=0;
    while(1)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        if(pop_list(head,&val)==-1)
        {
            pthread_mutex_unlock(&mutex);
            continue;
        }
        printf("data:%d\n",val);
        pthread_mutex_unlock(&mutex);
    }
    pthread_exit((void*)1);
}
int main()
{
    pthread_t tid1,tid2;
    init(&head);
    pthread_create(&tid1,NULL,product,NULL);
    pthread_create(&tid2,NULL,consumer,NULL);
    pthread_join(tid1,NULL);
    pthread_join(tid2,NULL);
    free(head);
    pthread_mutex_destroy(&mutex);
    return 0;
}

//2.使用條件變量實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式

條件變量:基于互斥鎖實(shí)現(xiàn)同步與互斥

個(gè)條件變量總是和一個(gè)Mutex搭配使用。 

一個(gè)線程可以調(diào)用pthread_cond_wait在一個(gè)Condition Variable上阻塞等待,這個(gè)函數(shù)做以下三步操作:
1. 釋放Mutex
2. 阻塞等待
3. 當(dāng)被喚醒時(shí),重新獲得Mutex并返回

條件變量相關(guān)函數(shù):

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;



int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,

             const struct timespec *restrict abstime);

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

pthread_cond_timedwait函數(shù)還有一個(gè)額外的參數(shù)可以設(shè)定等待超時(shí),如果到達(dá)了abstime所指
定的時(shí)刻仍然沒有別的線程來(lái)喚醒當(dāng)前線程,就返回ETIMEDOUT



int pthread_cond_broadcast(pthread_cond_t *cond);

int pthread_cond_signal(pthread_cond_t *cond);

一個(gè)線程可以調(diào)用pthread_cond_signal喚醒在某個(gè)Condition Variable上等待的另一個(gè)線程,也可以調(diào)用pthread_cond_broadcast喚醒在這個(gè)Condition Variable上等待的所有線程。

#include<stdio.h>
#include<malloc.h>
#include<pthread.h>
typedef int  _dataType_;
typedef int*  _dataType_p_;
typedef struct _node
{
    _dataType_ data;
    struct _node* next;
}node,*nodep,**nodepp;
nodep head=NULL;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
nodep  buyNode(_dataType_ val)
{
    nodep tmp=(nodep)malloc(sizeof(node));
    if(tmp!=NULL)
    {
        tmp->data=val;
        tmp->next=NULL;
        return tmp;
    }
    return NULL;
}
void init(nodepp head)
{
    *head=buyNode(0);
}
void push_list(nodep head,_dataType_ val)
{
    nodep tmp=buyNode(val);
    tmp->next=head->next;
    head->next=tmp;
}
int pop_list(nodep head,_dataType_p_ pval)
{
    if(head->next==NULL)
        return -1;
    nodep del=head->next;
    *pval=del->data;
    head->next=del->next;
    free(del);
    return 0;
}
void* product(void* arg)
{
    _dataType_  i=0;
    while(1)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        push_list(head,i++);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
    pthread_exit((void*)1);
}
void* consumer(void* arg)
{
    _dataType_ val=0;
    while(1)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        if(pop_list(head,&val)==-1)
            pthread_cond_wait(&cond,&mutex);
        printf("data:%d\n",val);
        pthread_mutex_unlock(&mutex);
    }
    pthread_exit((void*)1);
}
int main()
{
    pthread_t tid1,tid2;
    init(&head);
    pthread_create(&tid1,NULL,product,NULL);
    pthread_create(&tid2,NULL,consumer,NULL);
    pthread_join(tid1,NULL);
    pthread_join(tid2,NULL);
    free(head);
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
    return 0;
}

//3.使用環(huán)形buf存儲(chǔ)數(shù)據(jù),信號(hào)量的使用

信號(hào)量相關(guān)函數(shù):

信號(hào)量(Semaphore)Mutex類似,表示可用資源的數(shù)量,Mutex不同的是這個(gè)數(shù)量可以大于1

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

int sem_destroy(sem_t *sem);

int sem_wait(sem_t *sem);//類似P操作

int sem_trywait(sem_t *sem);

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

int sem_post(sem_t *sem);//類似V操作

#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
#define _SEM_PRO_ 20
#define _SEM_COM_ 0
typedef int  _dataType_;
_dataType_ blank[_SEM_PRO_];
sem_t sem_product;
sem_t sem_consumer;
void* product(void* arg)
{
    int index=0;
    int count=0;
    while(1)
    {
        sleep(rand()%5);
        sem_wait(&sem_product);
        blank[index++]=count++;
        sem_post(&sem_consumer);
        index%=_SEM_PRO_;
    }
    pthread_exit((void*)1);
}
void* consumer(void* arg)
{
    int index=0;
    while(1)
    {
        sem_wait(&sem_consumer);
        printf("data:%d\n",blank[index++]);
        sem_post(&sem_product);
        index%=_SEM_PRO_;
    }
    pthread_exit((void*)1);
}
int main()
{
    pthread_t tid1,tid2;
    sem_init(&sem_product,0,20);
    sem_init(&sem_consumer,0,0);
    pthread_create(&tid1,NULL,product,NULL);
    pthread_create(&tid2,NULL,consumer,NULL);
    pthread_join(tid1,NULL);
    pthread_join(tid2,NULL);
    sem_destroy(&sem_product);
    sem_destroy(&sem_consumer);
    return 0;
}

//4.多生產(chǎn)者,多消費(fèi)者模式

#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
#define _SEM_PRO_ 20
#define _SEM_COM_ 0
typedef int  _dataType_;
_dataType_ blank[_SEM_PRO_];
sem_t sem_product;
sem_t sem_consumer;
pthread_mutex_t mutex_product=PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_consumer=PTHREAD_MUTEX_INITIALIZER;

void* product(void* arg)
{
    int index=0;
    int count=0;
    while(1)
    {
        sleep(rand()%5);
        sem_wait(&sem_product);
        pthread_mutex_lock(&mutex_product);
        printf("%d thread id doing\n",(int)arg);
        blank[index++]=count++;
        index%=_SEM_PRO_;
        pthread_mutex_unlock(&mutex_product);
        sem_post(&sem_consumer);
    }
    pthread_exit((void*)1);
}
void* consumer(void* arg)
{
    int index=0;
    while(1)
    {
        sem_wait(&sem_consumer);
        pthread_mutex_lock(&mutex_consumer);
        printf("%d thread is consumer,data:%d\n",(int)arg,blank[index++]);
        index%=_SEM_PRO_;
        pthread_mutex_unlock(&mutex_consumer);
        sem_post(&sem_product);
    }
    pthread_exit((void*)1);
}
int main()
{
    pthread_t tid1,tid2,tid3,tid4;
    sem_init(&sem_product,0,20);
    sem_init(&sem_consumer,0,0);
    pthread_create(&tid1,NULL,product,(void*)1);
    pthread_create(&tid2,NULL,consumer,(void*)2);
    pthread_create(&tid3,NULL,product,(void*)3);
    pthread_create(&tid4,NULL,consumer,(void*)4);
    pthread_join(tid1,NULL);
    pthread_join(tid2,NULL);
    pthread_join(tid3,NULL);
    pthread_join(tid4,NULL);
    sem_destroy(&sem_product);
    sem_destroy(&sem_consumer);
    pthread_mutex_destroy(&mutex_product);
    pthread_mutex_destroy(&mutex_consumer);
}

運(yùn)行結(jié)果顯示:

生產(chǎn)者與消費(fèi)者模式(線程的同步與互斥)

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI