Memcached源码阅读十六 线程交互

  • 2021-01-11
  • 浏览 (28)

Memcached源码阅读十六 线程交互

Memcached按之前的分析可以知道,其是典型的Master-Worker线程模型,这种模型很典型,其工作模型是Master绑定端口,监听网络连接,接受网络连接之后,通过线程间通信来唤醒Worker线程,Worker线程已经连接的描述符执行读写操作,这种模型简化了整个通信模型,下面分析下这个过程。

case conn_listening:
    addrlen = sizeof(addr);
    //Master线程(main)进入状态机之后执行accept操作,这个操作也是非阻塞的。
    if ((sfd = accept(c->sfd, (struct sockaddr *) &addr, &addrlen)) == -1)
    {
        //非阻塞模型,这个错误码继续等待
        if (errno == EAGAIN || errno == EWOULDBLOCK)
        {
            stop = true;
        }
        //连接超载
        else if (errno == EMFILE)
        {
            if (settings.verbose > 0)
                 fprintf(stderr, "Too many open connections\n");
            accept_new_conns(false);
            stop = true;
        }
        else
        {
            perror("accept()");
            stop = true;
        }
        break;
    }
    //已经accept成功,将accept之后的描述符设置为非阻塞的
    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
            || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
    {
        perror("setting O_NONBLOCK");
        close(sfd);
        break;
    }
    //判断是否超过最大连接数
    if (settings.maxconns_fast
           && stats.curr_conns + stats.reserved_fd >= settings.maxconns - 1)
    {
        str = "ERROR Too many open connections\r\n";
        res = write(sfd, str, strlen(str));
        close(sfd);
        STATS_LOCK();
        stats.rejected_conns++;
        STATS_UNLOCK();
    }
    else
    {       

        //直线连接分发   
        dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                   DATA_BUFFER_SIZE, tcp_transport);
     }

     stop = true;
     break;

这个是TCP的连接建立过程,由于UDP不需要建立连接,所以直接分发给Worker线程,让Worker线程进行读写操作,而TCP在建立连接之后,也执行连接分发(和UDP的一样),下面看看dispatch_conn_new内部是如何进行链接分发的。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) 
{
    //创建一个连接队列
    CQ_ITEM *item = cqi_new();
    char buf[1];
    //通过round-robin算法选择一个线程
    int tid = (last_thread + 1) % settings.num_threads;

    //thread数组存储了所有的工作线程
    LIBEVENT_THREAD *thread = threads + tid;

    //缓存这次的线程编号,下次待用
    last_thread = tid;

    //sfd表示accept之后的描述符
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    //投递item信息到Worker线程的工作队列中
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    //在Worker线程的notify_send_fd写入字符c,表示有连接   
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

投递到子线程的连接队列之后,同时,通过往子线程的PIPE管道写入字符c来,下面我们看看子线程是如何处理的?

//子线程会在PIPE管道读上面建立libevent事件,事件回调函数是thread_libevent_process
event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //PIPE管道读取一个字节的数据
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) 
    {
    case 'c':
    //从连接队列读出Master线程投递的消息
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);//创建连接
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    }
}

之前分析过conn_new的执行流程,conn_new里面会建立sfd的网络监听libevent事件,事件回调函数为event_handler

event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
event_base_set(base, &c->event);

event_handler的执行流程最终会进入到业务处理的状态机中,关于状态机,后续分析。