redis网络模型
redis的网络涉及以下文件(与客户端)
- server.c(redis server入口)
- ae.c(redis 网络模型ae对象, 封装了epoll/select/kqueue/evport)
- ae_epoll.c/ae_evport.c/ae_kqueue.c/ae_select.c(操作系统的网络系统函数)
- anet.c(对socket的操作的封装, 获取信息,设置,监听,连接,读写等)
- networking.c(网络事件处理入口, 读写处理, 返回, 客户端处理等)
从redis server建立监听到与客户端建立连接, 再到读请求[命令]处理流程
main => initServer => listenToPort => anetTcpServer => _anetTcpServer
=> aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
acceptTcpHandler => anetTcpAccept = > acceptCommonHandler => createClient(绑定读[读事件]写[客户端对象写缓冲区]) =>
=> readQueryFromClient => read => processInputBufferAndReplicate = >processInputBuffer => processCommand => c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr) => call(c,CMD_CALL_FULL) => c->cmd->proc(c)
aeCreateFileEvent => aeApiAddEvent(eventLoop->events) => epoll_ctl(epoll)
全局只有一个Reactor
aeEventLoop
typedef struct aeEventLoop { aeFileEvent *events; aeFiredEvent *fired; aeTimeEvent *timeEventHead; } aeEventLoop;
aeEventLoop 全局网络模型对象, 包含了注册事件和触发事件数组用于存放fd
aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
...
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
...
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
调用多路复用API,只会在超时或有事件触发时返回, 可以看到会把触发的事件添加到eventLoop->fired, 然后在接下来处理
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
...
//多路复用id模型
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
int invert = fe->mask & AE_BARRIER;
//处理读事件, 客户端请求连接
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
//处理写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
//处理读事件
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
//定时事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}