程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> 多線程的通信方法總線,多線程通信總線

多線程的通信方法總線,多線程通信總線

編輯:關於C語言

多線程的通信方法總線,多線程通信總線


進程通信方式

1、管道(Pipe):管道可用於具有親緣關系進程間的通信,允許一個進程和另一個與它有共同祖先的進程之間進行通信。
2、命名管道(named pipe):命名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關 系 進程間的通信。命名管道在文件系統中有對應的文件名。命名管道通過命令mkfifo或系統調用mkfifo來創建。
3、信號(Signal):信號是比較復雜的通信方式,用於通知接受進程有某種事件發生,除了用於進程間通信外,進程還可以發送 信號給進程本身;linux除了支持Unix早期信號語義函數sigal外,還支持語義符合Posix.1標准的信號函數sigaction(實際上,該 函數是基於BSD的,BSD為了實現可靠信號機制,又能夠統一對外接口,用sigaction函數重新實現了signal函數)。
4、消息(Message)隊列:消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字 節流以及緩沖區大小受限等缺
5、共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。
6、內存映射(mapped memory):內存映射允許任何多個進程間通信,每一個使用該機制的進程通過把一個共享的文件映射到自己的進程地址空間來實現它。
7、信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。
8、套接口(Socket):更為一般的進程間通信機制,可用於不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。

 

管理通信示例

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
#define BUFFER_SIZE 16
struct prodcons
{
    int buffer[BUFFER_SIZE];
    pthread_mutex_t lock;  //mutex ensuring exclusive access to buffer
    int readpos,writepos;  //position for reading and writing
    pthread_cond_t notempty;  //signal when buffer is not empty
    pthread_cond_t notfull;  //signal when buffer is not full
};
//initialize a buffer
void init(struct prodcons* b)
{
    pthread_mutex_init(&b->lock,NULL);
    pthread_cond_init(&b->notempty,NULL);
    pthread_cond_init(&b->notfull,NULL);
    b->readpos = 0;
    b->writepos = 0;
}
//store an integer in the buffer
void put(struct prodcons* b, int data)
{
    pthread_mutex_lock(&b->lock);
//wait until buffer is not full
    while((b->writepos+1)%BUFFER_SIZE == b->readpos)
    {
        printf("wait for not full\n");
        pthread_cond_wait(&b->notfull,&b->lock);
    }
    b->buffer[b->writepos] = data;
    b->writepos++;
    b->writepos %= BUFFER_SIZE;
    pthread_cond_signal(&b->notempty); //signal buffer is not empty
    pthread_mutex_unlock(&b->lock);
}
//read and remove an integer from the buffer
int get(struct prodcons* b)
{
    int data;
    pthread_mutex_lock(&b->lock);
//wait until buffer is not empty
    while(b->writepos == b->readpos)
    {
        printf("wait for not empty\n");
        pthread_cond_wait(&b->notempty,&b->lock);
    }
    data=b->buffer[b->readpos];
    b->readpos++;
    b->readpos %= BUFFER_SIZE;
    pthread_cond_signal(&b->notfull);  //signal buffer is not full
    pthread_mutex_unlock(&b->lock);
    return data;
}
#define OVER -1
struct prodcons buffer;
void * producer(void * data)
{
    int n;
    for(n=0; n<50; ++n)
    {
        printf("put-->%d\n",n);
        put(&buffer,n);
    }
    put(&buffer,OVER);
    printf("producer stopped\n");
    return NULL;
}
void * consumer(void * data)
{
    int n;
    while(1)
    {
        int d = get(&buffer);
        if(d == OVER) break;
        printf("get-->%d\n",d);
    }
    printf("consumer stopped\n");
    return NULL;
}
int main()
{
    pthread_t tha,thb;
    void * retval;
    init(&buffer);
    pthread_create(&tha,NULL,producer,0);
    pthread_create(&thb,NULL,consumer,0);
    pthread_join(tha,&retval);
    pthread_join(thb,&retval);
    return 0;
}

運行

 

有名管道通信示例

//createfifo.c
#include<stdio.h>
#include<fcntl.h>
#include<unistd.h>
#include<sys/types.h>
#include<error.h>
#include<sys/stat.h>
#include<stdlib.h>

int main()
{
    if(mkfifo("fifo.demo", 0666) == -1)
    {
        perror("mkfifo");
        exit(-1);
    }
    return 0;
}


//writefifo.c
//先執行writefifo,不執行readfifo,則阻塞在open函數(可以增加參數,實現非阻塞)
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<stdlib.h>
#include<stdio.h>
#include<error.h>
#define N 64
int main()
{
    int fd;
    char buf[N];
    if((fd=open("fifo.demo", O_WRONLY)) == -1)
    {
        perror("open fifo.demo");
        exit(-1);
    }
    while(1)
    {
        if(fgets(buf, N, stdin) != NULL)
        {
            write(fd, buf, strlen(buf));
            if(strncmp(buf,"exit", 4) == 0)//比較前四個字符,buf第五個字符是'\n'
            {
                close(fd);
                exit(1);
            }
        }
    }
}


//readfifo.c
//先執行readfifo,不執行writefifo,則阻塞在open函數(可以增加參數,實現非阻塞)
#include<fcntl.h>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#define N 64
int main()
{
    int fd;
    char buf[N];
    int n;
    if((fd=open("fifo.demo", O_RDONLY))== -1)
    {
        perror("open fifo.demo");
        exit(-1);
    }
    while(1)
    {
        if((n = read(fd, buf, N)) >= 0)
        {
            if(n == 0)
            {
                exit(1);
            }
            write(1, buf, n);//讀多少,寫多少
        }
    }
}

運行

 

信號量通信示例

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sem.h>

union semun
{
    int val;
    struct semid_ds *buf;
    unsigned short *arry;
};

static int sem_id = 0;
static int set_semvalue();
static void del_semvalue();
static int semaphore_p();
static int semaphore_v();

int main(int argc, char *argv[])
{
    char message = 'X';
    int i = 0;

    //創建信號量
    sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT);

    if(argc > 1)
    {
        //程序第一次被調用,初始化信號量
        if(!set_semvalue())
        {
            fprintf(stderr, "Failed to initialize semaphore\n");
            exit(EXIT_FAILURE);
        }
        //設置要輸出到屏幕中的信息,即其參數的第一個字符
        message = argv[1][0];
        sleep(2);
    }
    for(i = 0; i < 10; ++i)
    {
        //進入臨界區
        if(!semaphore_p())
            exit(EXIT_FAILURE);
        //向屏幕中輸出數據
        printf("%c", message);
        //清理緩沖區,然後休眠隨機時間
        fflush(stdout);
        sleep(rand() % 3);
        //離開臨界區前再一次向屏幕輸出數據
        printf("%c", message);
        fflush(stdout);
        //離開臨界區,休眠隨機時間後繼續循環
        if(!semaphore_v())
            exit(EXIT_FAILURE);
        sleep(rand() % 2);
    }

    sleep(10);
    printf("\n%d - finished\n", getpid());

    if(argc > 1)
    {
        //如果程序是第一次被調用,則在退出前刪除信號量
        sleep(3);
        del_semvalue();
    }
    exit(EXIT_SUCCESS);
}

static int set_semvalue()
{
    //用於初始化信號量,在使用信號量前必須這樣做
    union semun sem_union;

    sem_union.val = 1;
    if(semctl(sem_id, 0, SETVAL, sem_union) == -1)
        return 0;
    return 1;
}

static void del_semvalue()
{
    //刪除信號量
    union semun sem_union;

    if(semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
        fprintf(stderr, "Failed to delete semaphore\n");
}

static int semaphore_p()
{
    //對信號量做減1操作,即等待P(sv)
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = -1;//P()
    sem_b.sem_flg = SEM_UNDO;
    if(semop(sem_id, &sem_b, 1) == -1)
    {
        fprintf(stderr, "semaphore_p failed\n");
        return 0;
    }
    return 1;
}

static int semaphore_v()
{
    //這是一個釋放操作,它使信號量變為可用,即發送信號V(sv)
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = 1;//V()
    sem_b.sem_flg = SEM_UNDO;
    if(semop(sem_id, &sem_b, 1) == -1)
    {
        fprintf(stderr, "semaphore_v failed\n");
        return 0;
    }
    return 1;
}

運行

 

消息隊列通信示例

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>

#define BUFSZ 512
#define TYPE 100

struct msgbuf
{
    long mtype;
    char mtext[BUFSZ];
};

int main()
{
    int qid, len;
    key_t key;
    struct msgbuf msg;
    /*根據不同的路徑和關鍵字產生key*/
    if((key = ftok(".", 'a')) == -1)
    {
        perror("ftok");
        exit(-1);
    }
    /*創建消息隊列*/
    if((qid = msgget(key, IPC_CREAT|0666)) == -1)
    {
        perror("mesgget");
        exit(-1);
    }
    printf("opened queue %d\n", qid);
    printf("please input the message to queue:");
    if((fgets((&msg)->mtext, BUFSZ, stdin)) == NULL)
    {
        puts("no message\n");
        exit(-1);
    }
    msg.mtype = TYPE;
    len = strlen(msg.mtext) + 1;
    /*添加消息到消息隊列*/
    if(msgsnd(qid, &msg, len, 0) < 0)
    {
        perror("msgsnd");
        exit(-1);
    }
    /*從消息隊列讀取消息*/
    if(msgrcv(qid, &msg, BUFSZ, 0, 0) < 0)
    {
        perror("msgrcv");
        exit(-1);
    }
    printf("message is :%s\n",msg.mtext);
    /*從系統中刪除消息隊列*/
    if(msgctl(qid, IPC_RMID, NULL) < 0)
    {
        perror("msgctl");
        exit(-1);
    }
    return 0;
}

運行

 

共享內存示例

//writer.c  
#include <stdio.h>  
#include <unistd.h>  
#include <signal.h>  
#include <stdlib.h>  
#include <errno.h>  
#include <sys/ipc.h>  
#include <sys/types.h>  
#include <sys/shm.h>  
#include <string.h>  
  
#define N 1024  
  
typedef struct  
{  
    pid_t pid;  //最先打開的進程會把自己的pid記錄在此,對其進行初始化  
    char buf[N];  
}SHM;  
  
void handler(int signo)   //用於發給喚醒pause  
{  
    
}  
  
int main()  
{  
    key_t key;  
    int shmid;  
    SHM *shmaddr;  
    pid_t peerpid;  
  
    signal(SIGUSR1, handler);  
  
    if ((key = ftok(".", 'a')) == -1)  
    {  
        perror("ftok");  
        exit(-1);  
    }  
   
//創建共享內存  
    if ((shmid = shmget(key, sizeof(SHM), 0666 | IPC_CREAT | IPC_EXCL)) == -1)//IPC_EXCL同open的EXCL,用於判斷是哪個進程最先打開  
    {  
        if (errno == EEXIST)  
        {  
            shmid = shmget(key, sizeof(SHM), 0666);  
      
            if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)  
            {  
                perror("shmat");  
                exit(-1);  
            }  
            peerpid = shmaddr->pid;  
            shmaddr->pid = getpid();  
            kill(peerpid, SIGUSR1);  
        }  
        else  
        {  
            perror("shmget");  
            exit(-1);  
        }  
    }  
    else //first  
    {  
        if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)//添加映射  
        {  
            perror("shmat");  
            exit(-1);  
        }  
  
        shmaddr->pid = getpid();  
        pause();  
        peerpid = shmaddr->pid;  
    }  
  
    while (1)  
    {  
        fgets(shmaddr->buf, N, stdin);  
        kill(peerpid, SIGUSR1);  
        if (strncmp(shmaddr->buf, "quit", 4) == 0)  
            break;  
        pause();  
    }
    
  
    if (shmdt(shmaddr) == -1) //解除映射  
    {  
        perror("shmdt");  
        exit(-1);  
    }  
  
    if (shmctl(shmid, IPC_RMID, NULL) == -1)//釋放共享內存  
    {  
        perror("RM");  
        exit(-1);  
    }
  
    exit(0);  
}


//reader.c  
#include <stdio.h>  
#include <unistd.h>  
#include <signal.h>  
#include <stdlib.h>  
#include <errno.h>  
#include <sys/ipc.h>  
#include <sys/types.h>  
#include <sys/shm.h>  
#include <string.h>  
  
#define N 1024  
  
typedef struct  
{  
    pid_t pid;  
    char buf[N];  
}SHM;  
  
void handler(int signo)  
{  
}  
  
int main()  
{  
    key_t key;  
    int shmid;  
    SHM *shmaddr;  
    pid_t peerpid;  
  
    signal(SIGUSR1, handler);  
  
    if ((key = ftok(".", 'a')) == -1)  
    {  
        perror("ftok");  
        exit(-1);  
    }  
  
    if ((shmid = shmget(key, sizeof(SHM), 0666 | IPC_CREAT | IPC_EXCL)) == -1)  
    {  
        if (errno == EEXIST)  
        {  
            shmid = shmget(key, sizeof(SHM), 0666);  
      
            if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)  
            {  
                perror("shmat");  
                exit(-1);  
            }  
            peerpid = shmaddr->pid;  
            shmaddr->pid = getpid();  
            kill(peerpid, SIGUSR1);  
        }  
        else  
        {  
            perror("shmget");  
            exit(-1);  
        }  
    }  
    else //first  
    {  
        if ((shmaddr = (SHM *)shmat(shmid, NULL, 0)) == (SHM *)-1)  
        {  
            perror("shmat");  
            exit(-1);  
        }  
  
        shmaddr->pid = getpid();  
        pause();  
        peerpid = shmaddr->pid;  
    }  
  
    while (1)  
    {  
        pause();  
        printf("%s", shmaddr->buf);  
        if (strncmp(shmaddr->buf, "quit", 4) == 0)  
            break;  
        kill(peerpid, SIGUSR1);  
    }  
  
    if (shmdt(shmaddr) == -1)  
    {  
        perror("shmdt");  
        exit(-1);  
    }  
  
    exit(0);  
}

運行

 

信號通信示例

#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/wait.h>

pid_t pid;
void conductor_handler(int signo);
void driver_handler(int signo);

int main()
{
    if((pid = fork()) < 0)
    {
        perror("fork error.\n");
    }
    else if(pid == 0)
    {
        signal(SIGTSTP,SIG_IGN);
        signal(SIGINT,conductor_handler);
        signal(SIGQUIT,conductor_handler);
        signal(SIGUSR1,conductor_handler);
        while(1)
        {
            pause();
        }
    }
    else
    {
        signal(SIGINT,SIG_IGN);
        signal(SIGQUIT,SIG_IGN);
        signal(SIGTSTP,driver_handler);
        signal(SIGUSR1,driver_handler);
        signal(SIGUSR2,driver_handler);
        while(1)
        {
            pause();
        }
    }
    return 0;
}

void conductor_handler(int signo)
{
    switch(signo)
    {
        case SIGINT :
            kill(getppid(),SIGUSR1);
            break;
        case SIGQUIT:
            kill(getppid(),SIGUSR2);
            break;
        case SIGUSR1:
            printf("Final station ,all get off.\n");
            exit(0);
    }
}

void driver_handler(int signo)
{
    switch(signo)
    {
        case SIGTSTP :
            kill(pid,SIGUSR1);
            wait(NULL);
            exit(0);
        case SIGUSR1 :
            printf("bus will run...\n");
            break;
        case SIGUSR2 :
            printf("bus will stop...\n");
            break;
    }
}

運行

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved