Memcached源码分析三 网络连接建立

  • 2021-01-11
  • 浏览 (29)

Memcached源码分析三 网络连接建立

接着上一篇继续分析,上一篇请参考 《Memcached源码阅读之网络监听的建立》,这篇主要分析TCP的连接建立(从前面的代码分析可以看出,这个过程是由主线程驱动的),UDP没有连接建立的过程,所以之间进行连接分发,我们后续分析,现在直接上代码进行讲解。

conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport,
               struct event_base *base)
{
    conn *c = conn_from_freelist();
    //获取一个空闲连接,conn是Memcached内部对网络连接的一个封装
    //如果没有空闲的连接
    if (NULL == c)
    {
        if (!(c = (conn *)calloc(1, sizeof(conn))))//申请空间
        {
            fprintf(stderr, "calloc()\n");
            return NULL;

        }MEMCACHED_CONN_CREATE(c);

        //进行一些初始化
        c->rbuf = c->wbuf = 0;
        c->ilist = 0;
        c->suffixlist = 0;
        c->iov = 0;
        c->msglist = 0;
        c->hdrbuf = 0;

        c->rsize = read_buffer_size;
        c->wsize = DATA_BUFFER_SIZE;
        c->isize = ITEM_LIST_INITIAL;
        c->suffixsize = SUFFIX_LIST_INITIAL;
        c->iovsize = IOV_LIST_INITIAL;
        c->msgsize = MSG_LIST_INITIAL;
        c->hdrsize = 0;

        //每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便
        c->rbuf = (char *)malloc((size_t)c->rsize);
        c->wbuf = (char *)malloc((size_t)c->wsize);
        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
        c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
        c->msglist = (struct msghdr *) malloc(
            sizeof(struct msghdr) * c->msgsize);

        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0
            || c->msglist == 0 || c->suffixlist == 0)
        {
            conn_free(c);
            fprintf(stderr, "malloc()\n");
            return NULL;
        }

        STATS_LOCK();
        //统计变量更新
        stats.conn_structs++;
        STATS_UNLOCK();
    }

    c->transport = transport;
    c->protocol = settings.binding_protocol;

    if (!settings.socketpath)
    {
        c->request_addr_size = sizeof(c->request_addr);
    }
    else
    {
        c->request_addr_size = 0;
    }

    //输出一些日志信息
    if (settings.verbose > 1)
    {

        if (init_state == conn_listening)
        {
            fprintf(stderr, "<%d server listening (%s)\n", sfd,
                prot_text(c->protocol));
        }
        else if (IS_UDP(transport))
        {
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
        }
        else if (c->protocol == negotiating_prot)
        {
            fprintf(stderr, "<%d new auto-negotiating client connection\n",
                sfd);
        }
        else if (c->protocol == ascii_prot)
        {
            fprintf(stderr, "<%d new ascii client connection.\n", sfd);
        }
        else if (c->protocol == binary_prot)
        {
            fprintf(stderr, "<%d new binary client connection.\n", sfd);
        }
        else
        {
            fprintf(stderr, "<%d new unknown (%d) client connection\n", sfd,
                c->protocol);
            assert(false);
        }
    }

    c->sfd = sfd;
    c->state = init_state;
    c->rlbytes = 0;
    c->cmd = -1;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;

    c->write_and_go = init_state;
    c->write_and_free = 0;
    c->item = 0;

    c->noreply = false;
    //建立sfd描述符上面的event事件,事件回调函数为event_handler
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;
    
    if (event_add(&c->event, 0) == -1)
       {
        //如果建立libevent事件失败,将创建的conn添加到空闲列表中
        if (conn_add_to_freelist(c))
        {
            conn_free(c);
        }
        perror("event_add");
        return NULL;
    }

    STATS_LOCK();
    //统计信息更新
    stats.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    MEMCACHED_CONN_ALLOCATE(c->sfd);

    return c;
}

//获得conn
conn *conn_from_freelist()
{
    conn *c;

    pthread_mutex_lock(&conn_lock);//操作链表,加锁,保持同步
    //freecurr为静态全局变量     
    if (freecurr > 0)
    {
        //freeconns是在Memcached启动时初始化的
        c = freeconns[--freecurr];
    }
    else//没有conn
    {
        c = NULL;
    }
    pthread_mutex_unlock(&conn_lock);

    return c;
}

//添加conn到空闲链表中
bool conn_add_to_freelist(conn *c)
{
    bool ret = true;
    pthread_mutex_lock(&conn_lock);

    //freeconns还有空间
    if (freecurr < freetotal)
    {
        freeconns[freecurr++] = c;//直接添加
        ret = false;
    }
    else
    {
        //没有多余空间,进行扩容,按目前容量的2倍进行扩容
        size_t newsize = freetotal * 2;
        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
        if (new_freeconns)
        {
            freetotal = newsize;
            freeconns = new_freeconns;
            freeconns[freecurr++] = c;
            ret = false;
        }
    }
    pthread_mutex_unlock(&conn_lock);
    return ret;
}

//libevent事件回调函数的处理,回调函数被调用时,表明Memcached监听的端口号有网络事件到了
void event_handler(const int fd, const short which, void *arg)
{
    conn *c;

    c = (conn *)arg;
    assert(c != NULL);

    c->which = which;

    //这种情况应该很少出现  
    if (fd != c->sfd)
    {
        if (settings.verbose > 0)
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");

        conn_close(c);
        return;
    }

    //进入业务处理状态机
    drive_machine(c);

    return;
}