EPOLL(C/S模型)实现I/O复用多进程聊天室,通过共享内存、socketpair实现父子进程通信,通过信号量回收进程-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

这里只展示了server端client端可以用之前的poll写的。
每个client我们fork一个子进程用epoll来实现它的I/O复用。
非常巧妙的使用共享内存通过给每个client编号以及BUFFER_SIZE保存需要广播和接受的内容因为有了编号所以父子进程的socketpair通信我们只要传编号就可以表示这个client需要广播的内容了。
最后就是里面注册的信号量父进程不能直接说关闭就关闭否则没有及时关闭的子进程会变成僵尸进程所以我们通过注册的信号量来让系统走我们把所以子进程都关闭再关闭自己的逻辑。

缺点
当一部分client频繁发送时容易出现所处的共享内存上的buffer未发出但是新的又来了所以可能会导致吞消息的现象这时我们需要设置缓冲区来解决下次一定。

#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <set>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/shm.h>
#include <signal.h>

#define  USER_LIMIT 5  //最大用户数量
#define  BUFFER_SIZE 1024  //读缓冲区的大小
#define  FD_LIMIT 65535 //文件描述符数量限制
#define  MAX_EVENT_NUMBER 1024
#define  PROCESS_LIMIT 65536+65536
//客户数据  socket地址、待写到客户端的数据的位置、从客户端读入的数据
struct client_data
{
    sockaddr_in address;
    int connfd;      ///socket文件描述符
    pid_t pid;      //处理这个连接的子进程pid
    int pipefd[2];    //与父进程通信的管道
};
static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_men = 0;
//客户连接的数组进程用客户连接编号来索引获得相关的数据
client_data* users = 0;
//子进程和客户的连接映射关系表用进程的pid来索引数据获取该进程处理的客户连接编号
int* sub_process = 0;
int user_counet = 0 ;
std::set<int> nost;
//当前客户数量
std::set<int> curst;
bool stop_child = false;

int setnoblocking(int fd)
{
    int old_option = fcntl(fd,F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd,F_SETFL,new_option);
    return old_option;
}
void addfd(int epollfd,int fd)
{
    epoll_event event ;
    event.data.fd = fd ;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
    setnoblocking(fd);
}
void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(sig_pipefd[1],(char *)&msg,1,0);
    errno = save_errno;
}
void addsig(int sig , void(*handler)(int) ,bool restart = true )
{
    struct  sigaction sa ;
    memset(&sa,'\0',sizeof(sa));
    sa.sa_handler = handler;
    if ( restart )
        sa.sa_flags |= SA_RESTART;
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig,&sa,NULL) != 1 );
}
void del_resource()
{
    close(sig_pipefd[0]);
    close(sig_pipefd[1]);
    close(listenfd);
    close(epollfd);
    shm_unlink(shm_name);
    delete [] users;
    delete sub_process;
}
void child_term_handler(int sig)
{
    stop_child = true;
}
//子进程运行的函数参数idx指出该子进程处理的客户连接的编号user是保存所有客户连接数据的数组参数share_men指出共享内存的起始位置
int run_child(int idx,client_data* users,char* share_mem)
{
    epoll_event events[MAX_EVENT_NUMBER];
    //子进程使用I/O复用技术来同时监控两个文件描述符:客户连接socket、与父进程通信的管道文件描述符
    int child_epollfd = epoll_create(5);
    assert(child_epollfd != -1);
    int connfd = users[idx].connfd;
    addfd(child_epollfd,connfd);
    int pipefd = users[idx].pipefd[1];
    addfd(child_epollfd,pipefd);
    int ret;
    //子进程设置自己的信号处理函数
    addsig(SIGTERM,child_term_handler,false);
    while ( !stop_child )
    {
        int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
        if ((number < 0) && (errno != EINTR))
        {
            printf("epoll failure\n");
            break;
        }
        for (int i = 0 ; i < number ; i++ )
        {
            int sockfd = events[i].data.fd;
            //本子进程负责的客户连接有数据到达
            if ( (sockfd == connfd ) && ( events[i].events & EPOLLIN) )
            {
                memset(share_mem + idx*BUFFER_SIZE ,'\0',BUFFER_SIZE);
                //将客户数据读取到对应的读缓存中该读缓存时共享内存的一段
                ret = recv(connfd,share_mem+idx*BUFFER_SIZE,BUFFER_SIZE-1,0);
                if ( ret < 0 )
                {
                    if ( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }
                else if ( ret == 0 )
                {
                    stop_child = true;
                }else
                {
                    send(pipefd,(char *)&idx,sizeof(idx),0);
                }
            }else if ( (sockfd == pipefd) && (events[i].events & EPOLLIN) )
            {
                int client = 0 ;
                //接受主进程发来的数据
                ret = recv(sockfd,(char *)&client , sizeof(client),0);
                if ( ret < 0 )
                {
                    if ( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }else if ( ret == 0 )
                {
                    stop_child = true;
                }else
                {
                    send(connfd,share_mem+client*BUFFER_SIZE,BUFFER_SIZE,0);
                }
            }
        }
    }
    close(connfd);
    close(pipefd);
    close(child_epollfd);
    return 0;
}
int main()
{
    const char* ip = "192.168.174.129" ;
    int port = 5050 ;

    int ret = 0;
    sockaddr_in address;
    bzero(&address,sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port = htons(port);

    listenfd = socket(PF_INET,SOCK_STREAM,0);
    assert(listenfd >=0 );

    ret = bind(listenfd,(struct sockaddr *)&address,sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd,5);
    assert(ret != -1);

    user_counet = 0;
    users = new client_data [USER_LIMIT+1];
    sub_process = new int[PROCESS_LIMIT];
    for (int i = 1 ; i < PROCESS_LIMIT ; i++ )
        sub_process[i] = -1;
    for (int i = 0 ; i < USER_LIMIT ; i++ )
        nost.insert(i);
    epoll_event ev;
    epoll_event events[MAX_EVENT_NUMBER];
    epollfd = epoll_create(USER_LIMIT);
    assert(epollfd != -1);
    addfd(epollfd,listenfd);

    ret = socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);
    assert(ret != -1);
    setnoblocking(sig_pipefd[1]);
    addfd(epollfd,sig_pipefd[0]);

    addsig(SIGCLD,sig_handler);
    addsig(SIGTERM,sig_handler);
    addsig(SIGINT,sig_handler);
    addsig(SIGPIPE,SIG_IGN);
    bool stop_server = false;
    bool terminate = false;

    //创建共享内存作为客户socket连接的读缓存
    shmfd = shm_open(shm_name,O_CREAT|O_RDWR,0666);
    assert(shmfd != -1);
    ret = ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);
    assert(ret != -1);

    share_men = (char *) mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|
                                PROT_WRITE,MAP_SHARED,shmfd,0);
    assert(share_men != MAP_FAILED);
    close(shmfd);

    while ( !stop_server )
    {
        int number = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
        if ( (number < 0 ) && ( errno != EINTR ) )
        {
            printf("epoll falure\n");
            break;
        }
        for (int i  = 0 ; i < number; i++ ){
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd,(struct sockaddr*)&client_address
                        ,&client_addrlength);
                if ( connfd < 0 ) {
                    printf("errno is : %d \n", errno);
                    continue;
                }
                if ( curst.size() >= USER_LIMIT )
                {
                    const char * info = "too many users\n";
                    printf("%s",info);
                    send(connfd,info, strlen(info),0);
                    close(connfd);
                    continue;
                }
                user_counet = *nost.begin();
                nost.erase(nost.begin());
                curst.insert(user_counet);
                users[user_counet].address = client_address;
                users[user_counet].connfd = connfd;
                ret = socketpair(PF_UNIX,SOCK_STREAM,0,users[user_counet].pipefd);
                assert(ret != -1);
                pid_t  pid = fork();
                if ( pid < 0 )
                {
                    close(connfd);
                    continue;
                }else if ( pid == 0 )
                {
                    close(epollfd);
                    close(listenfd);
                    close(users[user_counet].pipefd[0]);
                    close(sig_pipefd[0]);
                    close(sig_pipefd[1]);
                    run_child(user_counet,users,share_men);
                    munmap((void*)share_men,USER_LIMIT*BUFFER_SIZE);
                    exit(0);
                }else
                {
                    close(connfd);
                    close(users[user_counet].pipefd[1]);
                    addfd(epollfd,users[user_counet].pipefd[0]);
                    users[user_counet].pid = pid;
                    printf("client %d join , now curclient %d \n",user_counet,curst.size());
                    sub_process[pid] = user_counet;
                }

            }else if ( (sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
            {
                int sig;
                char signals[1024];
                printf("recv sig !!!\n");
                ret = recv(sig_pipefd[0],signals,sizeof(signals),0);
                if ( ret == -1 )
                {
                    continue;
                }
                else if ( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for (int k = 0 ; k < ret; ++k )
                    {
                        switch (signals[k])
                        {
                            case SIGCHLD:
                            {
                            pid_t pid;
                            int stat;
                                while ( (pid = waitpid(-1,&stat,WNOHANG)) > 0 )
                                {
                                    //用子进程的pid取需要关闭的客户连接id
                                    int del_user = sub_process[pid];
                                    sub_process[pid] = -1;
                                    if ( ( del_user < 0 ) || ( del_user > USER_LIMIT ) )
                                    {
                                        continue;
                                    }
                                    printf("close : %d \n",del_user);
                                    nost.insert(del_user);
                                    curst.erase(del_user);
                                    //清除第del_user个客户端连接使用的相关数据
                                    epoll_ctl(epollfd,EPOLL_CTL_DEL,
                                              users[del_user].pipefd[0],0);
                                    close(users[del_user].pipefd[0]);
                                    users[del_user] = users[USER_LIMIT];
                                    sub_process[users[del_user].pid] = del_user;
                                }
                                if ( terminate && curst.empty() )
                                {
                                    stop_server = true;
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:
                            {
                                //结束服务器程序
                                printf("kill all the child now\n");
                                if ( curst.empty() )
                                {
                                    stop_server = true ;
                                    break;
                                }
                                for (auto & j : curst)
                                {
                                    int pid = users[j].pid;
                                    kill(pid,SIGTERM);
                                }
                                terminate = true ;
                                break;
                            }
                            default:
                            {
                                break;
                            }
                        }
                    }
                }
            }
            //某个子进程向父进程写了数据
            else if ( events[i].events & EPOLLIN )
            {
                int child = 0;
                //读取管道数据child变量记录了是哪个客户连接有数据可达
                ret = recv(sockfd,(char *)&child,sizeof(child),0);
                //printf("read data from child accross pipe\n");
                if ( ret == -1 )
                {
                    continue;
                }else if ( ret == 0 )
                {
                    continue;
                }
                else
                {
                    printf("read data from child accross pipe %s \n",share_men+child*BUFFER_SIZE);
                    //向除了负责第child个客户连接的子进程之外的其他进程发送消息通知他们的客户端有数据要写
                    for (auto j : curst)
                    {
                        if ( users[j].pipefd[0] != sockfd )
                        {
                            printf("send data to child accross pipe\n");
                            send(users[j].pipefd[0],(char* )&child,sizeof(child),0);
                        }
                    }
                }
            }
        }



    }

    del_resource();
    return 0;
}

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

“EPOLL(C/S模型)实现I/O复用多进程聊天室,通过共享内存、socketpair实现父子进程通信,通过信号量回收进程-CSDN博客” 的相关文章