Memcached源码阅读二 网络监听的建立
Memcahced
是一个服务器程序,所以需要建立网络监听来接受其他客户端机器的连接,下面分析下其过程,这次分析是基于Memcached 1.4.15
版本分析的。
// 如果socketpath为空,则表示使用的TCP / UDP, 不是使用unix socket
//如果socketpath为空,则表示使用的TCP/UDP,不是使用unix socket
if (settings.socketpath == NULL)
{
//可以从环境变量读取端口文件所在的文件路径
const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
char temp_portnumber_filename[PATH_MAX];
FILE *portnumber_file = NULL;
//如果端口文件不为空,则打开
if (portnumber_filename != NULL)
{
snprintf(temp_portnumber_filename, sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);
portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL)
{
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}
//settings.port表示Memcached采用的是TCP协议,创建TCP Socket,监听并且绑定
errno = 0;
if (settings.port
&& server_sockets(settings.port, tcp_transport, portnumber_file))
{
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
//settings.udpport表示Memcached采用的是UDP协议,创建UDP Socket,监听并且绑定
errno = 0;
if (settings.udpport
&& server_sockets(settings.udpport, udp_transport, portnumber_file))
{
vperror("failed to listen on UDP port %d", settings.udpport);
exit(EX_OSERR);
}
//端口文件不为空
if (portnumber_file)
{
fclose(portnumber_file);//关闭文件
rename(temp_portnumber_filename, portnumber_filename);//重命名端口文件
}
}
//TCP和UDP使用的是同一个接口来创建监听和绑定
static int server_sockets(int port, enum network_transport transport,
FILE *portnumber_file)
{
//settings.inter指定的是要绑定的ip地址信息,如果为空,则表示是绑定本机一个ip
if (settings.inter == NULL)
{
//执行监听和绑定操作
return server_socket(settings.inter, port, transport, portnumber_file);
}
//如果服务器有多个ip信息,可以在每个(ip,port)上面绑定一个Memcached实例,下面是一些输入参数的解析,解析完毕之后,执行绑定
else
{
// tokenize them and bind to each one of them..
char *b;
int ret = 0;
char *list = strdup(settings.inter);
if (list == NULL)
{
fprintf(stderr,
"Failed to allocate memory for parsing server interface string\n");
return 1;
}
for (char *p = strtok_r(list, ";,", &b); p != NULL;
p = strtok_r(NULL, ";,", &b))
{
int the_port = port;
char *s = strchr(p, ':');
if (s != NULL)
{
*s = '\0';
++s;
if (!safe_strtol(s, &the_port))
{
fprintf(stderr, "Invalid port number: \"%s\"", s);
return 1;
}
}
if (strcmp(p, "*") == 0)
{
p = NULL;
}
//绑定多次,循环调用单个的绑定函数
ret |= server_socket(p, the_port, transport, portnumber_file);
}
free(list);
return ret;
}
}
//执行真正的绑定
static int server_socket(const char *interface, int port,
enum network_transport transport, FILE *portnumber_file)
{
int sfd;
struct linger ling = { 0, 0 };
struct addrinfo *ai;
struct addrinfo *next;
//设定协议无关,用于监听的标志位
struct addrinfo hints = { .ai_flags = AI_PASSIVE,.ai_family = AF_UNSPEC };
char port_buf[NI_MAXSERV];
int error;
int success = 0;
int flags = 1;
//指定socket的类型,如果是udp,则用数据报协议,如果是tcp,则用数据流协议
hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
if (port == -1)
{
port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
//调用getaddrinfo,将主机地址和端口号映射成为socket地址信息,地址信息由ai带回
error = getaddrinfo(interface, port_buf, &hints, &ai);
if (error != 0)
{
if (error != EAI_SYSTEM)
fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
else
perror("getaddrinfo()");
return 1;
}
/*getaddrinfo返回多个addrinfo的情形有如下两种:
1.如果与interface参数关联的地址有多个,那么适用于所请求地址簇的每个地址都返回一个对应的结构。
2.如果port_buf参数指定的服务支持多个套接口类型,那么每个套接口类型都可能返回一个对应的结构。
*/
for (next = ai; next; next = next->ai_next)
{
conn *listen_conn_add;
//为每个地址信息建立socket
if ((sfd = new_socket(next)) == -1)
{
//建立socket过程中可能发生的,比如打开文件描述符过多等
if (errno == EMFILE)
{
perror("server_socket");
exit(EX_OSERR);
}
continue;
}
#ifdef IPV6_V6ONLY
if (next->ai_family == AF_INET6)
{
//设定IPV6的选项值,设置了IPV6_V6ONLY,表示只收发IPV6的数据包,此时IPV4和IPV6可以绑定到同一个端口而不影响数据的收发
error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flags,
sizeof(flags));
if (error != 0)
{
perror("setsockopt");
close(sfd);
continue;
}
}
#endif
//设定socket选项,SO_REUSEADDR表示重用地址信息,具体重用哪些东西自行学习,必须在bind操作之前设置
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
if (IS_UDP(transport)){
maximize_sndbuf(sfd);//扩大发送缓冲区
}
else
{
//设定socket选项,SO_KEEPALIVE表示保活
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
sizeof(flags));
if (error != 0)
perror("setsockopt");
//设定socket选项,SO_LINGER表示执行close操作时,如果缓冲区还有数据,可以继续发送
error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling,
sizeof(ling));
if (error != 0)
perror("setsockopt");
//设定IP选项,TCP_NODELAY表示禁用Nagle算法
error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
sizeof(flags));
if (error != 0)
perror("setsockopt");
}
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1)
{
if (errno != EADDRINUSE)
{
perror("bind()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
close(sfd);
continue;
}
else
{
success++;
//如果不是UDP协议,则执行监听操作,监听队列为初始启动的值
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1)
{
perror("listen()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
if (portnumber_file != NULL
&& (next->ai_addr->sa_family == AF_INET
|| next->ai_addr->sa_family == AF_INET6))
{
union
{
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
//这时还没连接建立,调用getsockname不知道有什么用? if (getsockname(sfd, (struct sockaddr*) &my_sockaddr, &len) == 0) {
if (next->ai_addr->sa_family == AF_INET)
{
fprintf(portnumber_file, "%s INET: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in.sin_port));
}
else
{
fprintf(portnumber_file, "%s INET6: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in6.sin6_port));
}
}
}
}
if (IS_UDP(transport))
{
int c;
for (c = 0; c < settings.num_threads_per_udp; c++)
{
//分发连接,因为UDP没有连接建立的过程,直接进行连接的分发
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, transport);
}
}
else
{
//TCP,建立连接
if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ |
EV_PERSIST, 1, transport, main_base)))
{
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
//建立的连接组成链表
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
//释放资源
freeaddrinfo(ai);
return success == 0;
}
//建立socket
static int new_socket(struct addrinfo *ai)
{
int sfd;
int flags;
//调用系统函数建立socket
if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
{
return -1;
}
//设定socket为非阻塞的
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
|| fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
{
perror("setting O_NONBLOCK");
close(sfd);
return -1;
}
return sfd;
}
//如果是UDP协议,调整发送缓存到最大值
static void maximize_sndbuf(const int sfd)
{
socklen_t intsize = sizeof(int);
int last_good = 0;
int min, max, avg;
int old_size;
//读取socket的选项,SO_SNDBF表示发送缓存
if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
{
if (settings.verbose > 0)
perror("getsockopt(SO_SNDBUF)");
return;
}
//二分搜索来设定,很巧的设计
min = old_size;
max = MAX_SENDBUF_SIZE;
while (min <= max)
{
avg = ((unsigned int)(min + max)) / 2;
if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
{
last_good = avg;
min = avg + 1;
}
else
{
max = avg - 1;
}
}
if (settings.verbose > 1)
fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size,
last_good);
}
至此,网络相关的部分已经完成,后向连接的建立(conn_new
)和连接分发(dispatch_conn_new
),我们放到其他文章中进行分析。