Memcached源码阅读二 网络监听的建立

  • 2021-01-11
  • 浏览 (28)

Memcached源码阅读二 网络监听的建立

Memcahced是一个服务器程序,所以需要建立网络监听来接受其他客户端机器的连接,下面分析下其过程,这次分析是基于Memcached 1.4.15版本分析的。

// 如果socketpath为空,则表示使用的TCP / UDP, 不是使用unix socket
//如果socketpath为空,则表示使用的TCP/UDP,不是使用unix socket
if (settings.socketpath == NULL)
{      
    //可以从环境变量读取端口文件所在的文件路径
    const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
    char temp_portnumber_filename[PATH_MAX];
    FILE *portnumber_file = NULL;

    
    //如果端口文件不为空,则打开
    if (portnumber_filename != NULL)
       {
        snprintf(temp_portnumber_filename, sizeof(temp_portnumber_filename),
            "%s.lck", portnumber_filename);

        portnumber_file = fopen(temp_portnumber_filename, "a");

        if (portnumber_file == NULL)
        {
            fprintf(stderr, "Failed to open \"%s\": %s\n",
                temp_portnumber_filename, strerror(errno));
        }
    }

    //settings.port表示Memcached采用的是TCP协议,创建TCP Socket,监听并且绑定
    errno = 0;
    if (settings.port
        && server_sockets(settings.port, tcp_transport, portnumber_file))
    {
        vperror("failed to listen on TCP port %d", settings.port);
        exit(EX_OSERR);
    }

    //settings.udpport表示Memcached采用的是UDP协议,创建UDP Socket,监听并且绑定
    errno = 0;
    if (settings.udpport
        && server_sockets(settings.udpport, udp_transport, portnumber_file))
    {
        vperror("failed to listen on UDP port %d", settings.udpport);
        exit(EX_OSERR);
    }

    //端口文件不为空
    if (portnumber_file)
    {
        fclose(portnumber_file);//关闭文件
        rename(temp_portnumber_filename, portnumber_filename);//重命名端口文件
    }
}

//TCP和UDP使用的是同一个接口来创建监听和绑定
static int server_sockets(int port, enum network_transport transport,
    FILE *portnumber_file)
{

    //settings.inter指定的是要绑定的ip地址信息,如果为空,则表示是绑定本机一个ip
    if (settings.inter == NULL)
    {   
        //执行监听和绑定操作      
        return server_socket(settings.inter, port, transport, portnumber_file);
    }
    //如果服务器有多个ip信息,可以在每个(ip,port)上面绑定一个Memcached实例,下面是一些输入参数的解析,解析完毕之后,执行绑定
    else
    {
        // tokenize them and bind to each one of them..
        char *b;
        int ret = 0;
        char *list = strdup(settings.inter);

        if (list == NULL)
        {
            fprintf(stderr,
                "Failed to allocate memory for parsing server interface string\n");
            return 1;
        }

        for (char *p = strtok_r(list, ";,", &b); p != NULL;
            p = strtok_r(NULL, ";,", &b))
        {

            int the_port = port;
            char *s = strchr(p, ':');

            if (s != NULL)
            {
                *s = '\0';
                ++s;
                if (!safe_strtol(s, &the_port))
                {

                    fprintf(stderr, "Invalid port number: \"%s\"", s);
                    return 1;
                }
            }

            if (strcmp(p, "*") == 0)
            {
                p = NULL;
            }

            //绑定多次,循环调用单个的绑定函数      
            ret |= server_socket(p, the_port, transport, portnumber_file);
        }

        free(list);
        return ret;
    }
}

//执行真正的绑定
static int server_socket(const char *interface, int port,
    enum network_transport transport, FILE *portnumber_file)
{
    int sfd;
    struct linger ling = { 0, 0 };
    struct addrinfo *ai;
    struct addrinfo *next;

    //设定协议无关,用于监听的标志位
    struct addrinfo hints = { .ai_flags = AI_PASSIVE,.ai_family = AF_UNSPEC };
    char port_buf[NI_MAXSERV];
    int error;
    int success = 0;
    int flags = 1;

    //指定socket的类型,如果是udp,则用数据报协议,如果是tcp,则用数据流协议
    hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

    if (port == -1)
    {
        port = 0;
    }
    snprintf(port_buf, sizeof(port_buf), "%d", port);

    //调用getaddrinfo,将主机地址和端口号映射成为socket地址信息,地址信息由ai带回
    error = getaddrinfo(interface, port_buf, &hints, &ai);
    if (error != 0)
    {
        if (error != EAI_SYSTEM)
            fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
        else
            perror("getaddrinfo()");
        return 1;
    }

   /*getaddrinfo返回多个addrinfo的情形有如下两种:
      1.如果与interface参数关联的地址有多个,那么适用于所请求地址簇的每个地址都返回一个对应的结构。
      2.如果port_buf参数指定的服务支持多个套接口类型,那么每个套接口类型都可能返回一个对应的结构。
    */    
    for (next = ai; next; next = next->ai_next)
    {
        conn *listen_conn_add;
        //为每个地址信息建立socket
        if ((sfd = new_socket(next)) == -1)
        {
            //建立socket过程中可能发生的,比如打开文件描述符过多等
            if (errno == EMFILE)
            {
                perror("server_socket");
                exit(EX_OSERR);
            }
            continue;
        }

#ifdef IPV6_V6ONLY
        if (next->ai_family == AF_INET6)
        {  
           //设定IPV6的选项值,设置了IPV6_V6ONLY,表示只收发IPV6的数据包,此时IPV4和IPV6可以绑定到同一个端口而不影响数据的收发
            error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flags,
                sizeof(flags));

            if (error != 0)
            {
                perror("setsockopt");
                close(sfd);
                continue;
            }
        }
#endif

    //设定socket选项,SO_REUSEADDR表示重用地址信息,具体重用哪些东西自行学习,必须在bind操作之前设置
    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));

    if (IS_UDP(transport)){
            maximize_sndbuf(sfd);//扩大发送缓冲区
    }
    else
    {  
        //设定socket选项,SO_KEEPALIVE表示保活
        error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
            sizeof(flags));
        if (error != 0)
            perror("setsockopt");

        //设定socket选项,SO_LINGER表示执行close操作时,如果缓冲区还有数据,可以继续发送
        error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling,
            sizeof(ling));
        if (error != 0)
            perror("setsockopt");

        //设定IP选项,TCP_NODELAY表示禁用Nagle算法
        error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
            sizeof(flags));
        if (error != 0)
            perror("setsockopt");
    }
    
    if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1)
    {
        if (errno != EADDRINUSE)
        {
            perror("bind()");
            close(sfd);
            freeaddrinfo(ai);
            return 1;
        }

        close(sfd);
        continue;
    }
    else
    {
        success++;
        //如果不是UDP协议,则执行监听操作,监听队列为初始启动的值
        if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1)
        {
            perror("listen()");
            close(sfd);
            freeaddrinfo(ai);
            return 1;
        }

        if (portnumber_file != NULL
            && (next->ai_addr->sa_family == AF_INET
                || next->ai_addr->sa_family == AF_INET6))
        {
            union
            {
                struct sockaddr_in in;
                struct sockaddr_in6 in6;
            } my_sockaddr;
            socklen_t len = sizeof(my_sockaddr);

            //这时还没连接建立,调用getsockname不知道有什么用?                if (getsockname(sfd, (struct sockaddr*) &my_sockaddr, &len) == 0) {
            if (next->ai_addr->sa_family == AF_INET)
            {
                fprintf(portnumber_file, "%s INET: %u\n",
                    IS_UDP(transport) ? "UDP" : "TCP",
                    ntohs(my_sockaddr.in.sin_port));
            }
            else
            {
                fprintf(portnumber_file, "%s INET6: %u\n",
                    IS_UDP(transport) ? "UDP" : "TCP",
                    ntohs(my_sockaddr.in6.sin6_port));
            }
        }
    }
}

if (IS_UDP(transport))
{
    int c;

    for (c = 0; c < settings.num_threads_per_udp; c++)
    {
        //分发连接,因为UDP没有连接建立的过程,直接进行连接的分发
        dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, transport);
    }
}
else
{
    //TCP,建立连接
    if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ |
             EV_PERSIST, 1, transport, main_base)))     
       {
        fprintf(stderr, "failed to create listening connection\n");
        exit(EXIT_FAILURE);
    }

    //建立的连接组成链表
    listen_conn_add->next = listen_conn;
    listen_conn = listen_conn_add;

  
       //释放资源
    freeaddrinfo(ai);    
    return success == 0;
}

//建立socket
static int new_socket(struct addrinfo *ai)
{
    int sfd;
    int flags;

    //调用系统函数建立socket
    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
    {
        return -1;
    }

    //设定socket为非阻塞的
    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
        || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
    {
        perror("setting O_NONBLOCK");
        close(sfd);
        return -1;
    }
    return sfd;
}

//如果是UDP协议,调整发送缓存到最大值
static void maximize_sndbuf(const int sfd)
{
    socklen_t intsize = sizeof(int);
    int last_good = 0;
    int min, max, avg;
    int old_size;

    //读取socket的选项,SO_SNDBF表示发送缓存
    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
    {
        if (settings.verbose > 0)
            perror("getsockopt(SO_SNDBUF)");
        return;
    }

    //二分搜索来设定,很巧的设计
    min = old_size;
    max = MAX_SENDBUF_SIZE;

    while (min <= max)
    {
        avg = ((unsigned int)(min + max)) / 2;
        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
        {
            last_good = avg;
            min = avg + 1;
        }
        else
        {
            max = avg - 1;
        }
    }

    if (settings.verbose > 1)
        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size,
            last_good);
}

至此,网络相关的部分已经完成,后向连接的建立(conn_new)和连接分发(dispatch_conn_new),我们放到其他文章中进行分析。