banner
moeyy

moeyy

一条有远大理想的咸鱼。
github
mastodon
email

Linux Network Programming Series (10) -- Basic Usage of epoll

1. Four IO Models in Network Programming#

  • Blocking IO model, where the default socket is blocking, meaning IO operations must wait for completion before returning;
  • Non-blocking IO model, where IO operations do not wait and return immediately, but require continuous polling of the kernel to check if data is ready; if it is ready, the function is called to process the data, using fcntl to set the socket to non-blocking;
  • Multiplexing model, which is event-driven IO, meaning that processing occurs only when an event is detected on a descriptor, typically using select and epoll;
  • Asynchronous IO model, where after initiating an IO operation, it immediately returns to do other tasks, and the kernel waits for data to be ready, copying the data to user memory and sending a signal to the user process to indicate that the IO operation is complete;

2. epoll Function#

2.1 Two Working Modes of epoll#

2.1.1 LT Mode (also known as Level Triggered, similar to select/poll):#

Completely kernel-driven; as long as a file descriptor changes, it will continuously notify our application until the processing is complete.

2.1.2 ET Mode (also known as Edge Triggered, must set the socket to non-blocking):#

In this case, when a file descriptor changes, epoll will notify the application only once and remove the descriptor from the monitoring queue until the application processes that change. If the application does not process it, epoll will not pay attention to that file descriptor again, which may lead to packet loss.
At this point, the application needs to maintain a table of fds, registering the status information obtained from epoll_wait into this table, and then the application can choose to traverse this table to operate on busy fds.

2.1.3 Choosing Between Level Triggered and Edge Triggered#

ET has more requirements for the application, requiring more design from the programmer. LT seems much simpler, but when we require timeout control for fds, LT also needs to traverse fds, making ET a better choice since it is already traversing.
Moreover, since the number of fds returned by epoll_wait each time is limited, in high concurrency scenarios, LT will be very busy, as all fds must generate status information in its queue, while only a portion of fds can return to the application each time.
ET, on the other hand, once epoll_wait returns a set of fds, these fds will be removed from the queue, and only when the fd becomes idle again will it be re-added to the queue. This means that as epoll_wait returns, the number of fds in the queue decreases, giving ET a significant advantage in high concurrency situations.

2.2 epoll Function Prototype#

2.2.1 epoll_create#

int epoll_create(int size); // Creates an epoll handle, size indicates the number of fds to listen to

Return value:
>0 returns the created epoll handle
-1 failure

2.2.2 epoll_ctl#

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

The event registration function for epoll, registering the types of events to listen to:
Parameter description:

  • epfd epoll_create return handle
  • op indicates the action, represented by three macros: EPOLL_CTL_ADD registers a new fd to epfd, EPOLL_CTL_MOD modifies the listening events of an already registered fd, EPOLL_CTL_DEL deletes a fd from epfd.
  • fd indicates the fd to listen to (generally the file descriptor generated by the socket function)
  • event tells the kernel what to listen for

The struct epoll_event structure is as follows:

struct epoll_event {
 __uint32_t events; // A collection of multiple macros indicating whether the corresponding file descriptor is readable, writable, urgent, etc.
 epoll_data_t data; // A union, detailed introduction below
};
typedef union epoll_data
{
 void *ptr;
 int fd;
 uint32_t u32;
 uint64_t u64;
}epoll_data_t;

2.2.3 epoll_wait#

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

Parameter description:

  • events determines whether to call accept to respond to a connection or call read or write to read/write files based on the events in events
  • maxevents tells the kernel the size of this events and cannot exceed the size in epoll_create

Function description:
Waits for events to occur on the socket fd registered in epfd (the file descriptor generated by epoll); if they occur, the occurring socket fd and event types are placed into the events array.
Also clears the event types of the socket fd registered in epfd, so if you want to pay attention to this socket fd in the next loop, you need to use epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev) to reset the event types of the socket fd. At this point, you do not need to use EPOLL_CTL_ADD, because the socket fd has not been cleared, only the event types have been cleared. This step is very important. When epoll_wait returns, based on the return value (greater than 0), call accept.

2.3 Implementation of epoll#

2.3.1 Calling Process of epoll Function#

socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close

2.3.2 Code Implementation#

First, supplement the CTCP class to set the socket to non-blocking:

int CTcp::SetNoblock (int nSock)
{
    assert (m_nSock != -1);
    int nFlags;

    if ( nSock == -1 )
    {
        nSock = m_nSock;
    }

    if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
        return 0;

    nFlags = nFlags | O_NONBLOCK;

    if (fcntl (nSock, F_SETFL, nFlags) < 0)
        return 0;

    return 1;
}

Then, based on the CTCP class, implement the CEpollServer class, as follows:

//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__

#include "SxTcp.h"

//Tcp class
class CEpollServer
{

//Constructor
public:
    CEpollServer ();
    virtual ~CEpollServer ();

//Public member functions
public:
    int CreateEpoll(const char* szIp, int nPort, int nSize);
    int ProcessEpoll();
    int CloseEpoll();

//Private member variables
private:
    CTcp m_cTcp;
    int m_nEpollFd;
};

#endif
#include "EpollServer.h"
#include <sys/epoll.h>
#include "TypeError.h"
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>

CEpollServer::CEpollServer ()
{
    m_nEpollFd = -1;
}

CEpollServer::~CEpollServer ()
{
    CloseEpoll();
    m_cTcp.Close();
}


/*Create epoll handle
  Input parameters:
  szIp server IP address
  nPort port to bind
  nSize number of file descriptors to listen to
  Output parameters: 1: success; 0: failure
*/
int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
{
    assert(szIp != nullptr);
    int iRet = 0;
    int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
   
    iRet = m_cTcp.Open();
    if ( iRet ==  0 )
    {
        return SOCKET_ERROR;
    }

    iRet = m_cTcp.Bind(szIp, nPort);
    if ( iRet == 0 )
    {
        return BIND_ERROR;
    }

    iRet = m_cTcp.SetNoblock();
    if ( iRet == 0 )
    {
        return SETSOCKOPT_ERROR;
    }

    iRet = m_cTcp.Listen(nSize+1);// The number of listening descriptors should be more than epoll?
    if ( iRet == 0)
    {
        return LISTEN_ERROR;
    }

    if ( m_nEpollFd != -1 )
    {
        CloseEpoll();
    }

    m_nEpollFd = epoll_create(size);
    if ( m_nEpollFd == -1)
    {
        return EPOLL_CREATE_ERROR;
    }

    return 1;
}

/*Process epoll events
  Output parameters: 1: success; 0: failure
*/
int CEpollServer::ProcessEpoll()
{
    assert(m_nEpollFd != -1);
    int nFds = 0;
    int connFd = -1, readFd = -1, writeFd = -1;
    int n = 0, nSize = 0;
    int nListenFd = -1;
    char buf[MAX_READ_SIZE] = {0};
    struct sockaddr_in clientAddr;
    socklen_t clilen;
    struct epoll_event ev, events[20];
    memset((void*)&ev, 0, sizeof(ev));
    nListenFd = m_cTcp.GetHandle();
    ev.data.fd = nListenFd;
    ev.events = EPOLLIN | EPOLLET;
    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
    {
        return EPOLL_CTL_ERROR;
    }
    while(1)
    {
        n = 0;
        nSize = 0;
        nFds = epoll_wait(m_nEpollFd, events, 20, 500);
        for (int i = 0; i< nFds; ++i)
        {
            memset(buf, 0, MAX_READ_SIZE);
            if (events[i].data.fd == nListenFd )
            {
                while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
                {
                    m_cTcp.SetNoblock(connFd);  // ET mode requires non-blocking
                    ev.data.fd = connFd;
                    ev.events = EPOLLIN | EPOLLET;
                    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
                    {
                        return EPOLL_CTL_ERROR;
                    }
                }
                if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
                {
                    return ACCEPT_ERROR;
                }
                continue;
            }
            else if(events[i].events & EPOLLIN)
            {
                readFd = events[i].data.fd;
                if (readFd < 0)
                {
                    continue;
                }
                // Read data
                while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )
                {
                    n += nSize;
                }
                // EAGAIN indicates end of reading
                if (nSize == -1 && errno != EAGAIN )
                {
                    fprintf(stderr, "epoll read failed\n");
                    //ngleLog::WriteLog(ERROR, "%s", "epoll read failed");
                }

                fprintf(stdout, "read data is:%s\n", buf);
               
                ev.data.fd = readFd;
                ev.events = EPOLLOUT | EPOLLET; // Edge Triggered (ET)
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
            }
            else if(events[i].events & EPOLLOUT)
            {
                writeFd = events[i].data.fd;
                // Write data
                strncpy(buf, "hello client", sizeof(buf)-1);
                int dataSize = strlen(buf);
                n = dataSize;
                while(n > 0)
                {
                    nSize = write(writeFd, buf + dataSize - n, n);
                    if (nSize < n)
                    {
                        if (nSize == -1 && errno != EAGAIN)
                        {                           
                            break;
                        }
                    }
                    n -= nSize;
                }

                ev.data.fd = writeFd;
                ev.events = EPOLLIN | EPOLLET;
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
            }
        }
    }
}

/*
Close epoll file descriptor
*/
int CEpollServer::CloseEpoll()
{
    if (m_nEpollFd != -1)
    {
        close (m_nEpollFd);
        m_nEpollFd = -1;
    }
    return 1;

}

Combine the above CEpollServer class and TCP class into a dynamic library, the makefile is as follows:

LIB_DIR=./lib
src=$(wildcard *.cpp)
obj=$(patsubst %.cpp,%.o,$(src))
PIC=-fPIC
LIBSO=-shared
#CC=g++ -gdwarf-2 -gstrict-dwarf
CC=g++ -g

%.o:%.cpp
    $(CC) -c $< $(PIC)

network:$(obj)
    $(CC) -o libnetwork.so $^ $(LIBSO)
    cp -f libnetwork.so ../test/lib

clean:
    rm -f *.o *.so

Then implement TestEpollServer.cpp as follows:
Note: The following ConfigIni and SingleLog are libraries I wrote during testing; modifications are needed to use the code below!

#include "../../readini/ConfigIni.h"
#include <string>
#include "../../network/EpollServer.h"
#include "../../log/SingleLog.h"

CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"

//epoll server
int epoll_server_init()
{   
    int iRet = -1;
    string strIp;
    int nPort = 0, nEpollNum = 0, nTimeout = 0;
    ConfigIni::Init(string(FILEDIR));
    strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
    if (strIp == "")
    {
        SingleLog::WriteLog(ERROR,"read server addr failed");
        return iRet;
    }

    nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
    if ( nPort == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server port failed");
        return iRet;
    }

    nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
    if ( nEpollNum == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server epoll num failed");
        return iRet;
    }

    nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
    if ( nTimeout == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server timeout failed");
        return iRet;
    }

    iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
    if ( iRet == 0 )
    {
        SingleLog::WriteLog(ERROR, "epoll create failed");
        return -1;
    }
   
    return 0;
}

void epoll_server_run()
{
    g_clEpollServer.ProcessEpoll();
}

int main()
{
    SingleLog::Init();
    if (epoll_server_init() == -1)
    {
        return -1;
    }
    epoll_server_run();
    return 0;
}
//TestClient.cpp
#include <stdio.h>
#include <iostream>
#include <string.h>
#include "../../network/SxTcp.h"
using namespace std;

int main()
{
    CTcp tcp;
    int iRet = 0;
    int iFd = 0;
    char buf[128] = {0};
   
    iRet = tcp.Open();
    if (iRet == 0)
    {
        perror("socket create failed");
        return -1;
    }

    iRet = tcp.Connect("192.168.233.250", 6666);
    if (iRet == 0)
    {
        perror("socket connect failed");
        return -1;
    }

    while(1)
    {
        memset(buf, 0, sizeof(buf));
        cout << "please input some string:";
        cin >> buf;
        iRet = tcp.Send(buf, strlen(buf));
        if (iRet < -1 && errno != EAGAIN)
        {
            perror("send failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("connect is closed");
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        iRet = tcp.Recv(buf, sizeof(buf));
        if (iRet < 0 && errno != EAGAIN)
        {
            perror("recv failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("socket not connect");
            return -1;
        }

        fprintf(stdout, "recv data is:%s\n", buf);
    }

    return 0;
}

Compile TestEpollServer.cpp and TestClient.cpp separately to generate the server and client applications for communication.

All content written by me on JianShu is original; reproduction requires my consent. JianShu homepage: https://www.jianshu.com/u/e8c7bb5e3257

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.