Memcached源码分析三 网络连接建立

Memcached源码分析三 网络连接建立 接着上一篇继续分析,上一篇请参考 《Memcached源码阅读之网络监听的建立》,这篇主要分析TCP的连接建立(从前面的代码分析可以看出,这个过程是由主线程驱动的),UDP没有连接建立的过程,所以之间进行连接分发,我们后续分析,现在直接上代码进行讲解。 conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base) { conn *c = conn_from_freelist(); //获取一个空闲连接,conn是Memcached内部对网络连接的一个封装 //如果没有空闲的连接 if (NULL == c) { if (!(c = (conn *)calloc(1, sizeof(conn))))//申请空间 { fprintf(stderr, "calloc()\n"); return NULL; }MEMCACHED_CONN_CREATE(c); //进行一些初始化 c->rbuf = c->wbuf = 0; c->ilist = 0; c->suffixlist = 0; c->iov = 0; c->msglist = 0; c->hdrbuf = 0; c->rsize = read_buffer_size; c->wsize = DATA_BUFFER_SIZE; c->isize = ITEM_LIST_INITIAL; c->suffixsize = SUFFIX_LIST_INITIAL; c->iovsize = IOV_LIST_INITIAL; c->msgsize = MSG_LIST_INITIAL; c->hdrsize = 0; //每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便 c->rbuf = (char *)malloc((size_t)c->rsize); c->wbuf = (char *)malloc((size_t)c->wsize); c->ilist = (item **)malloc(sizeof(item *) * c->isize); c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize); c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize); c->msglist = (struct msghdr *) malloc( sizeof(struct msghdr) * c->msgsize); if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 || c->msglist == 0 || c->suffixlist == 0) { conn_free(c); fprintf(stderr, "malloc()\n"); return NULL; } STATS_LOCK(); //统计变量更新 stats....

January 11, 2021

Memcached源码阅读一 初始化参数解析

Memcached源码阅读一 初始化参数解析 Memcached启动时,有很多配置参数可以选择,这些配置参数严重影响着Memcached的使用,下面分析下这些参数的意义,开源软件版本之间差异比较大,我这次分析是基于1.4.15进行分析的,大家学习时记得核对版本。 "a:" //unix socket的权限位信息,unix socket的权限位信息和普通文件的权限位信息一样 "p:" //memcached监听的TCP端口值,默认是11211 "s:" //unix socket监听的socket文件路径 "U:" //memcached监听的UDP端口值,默认是11211 "m:" //memcached使用的最大内存值,默认是64M "M" //当memcached的内存使用完时,不进行LRU淘汰数据,直接返回错误,该选项就是关闭LRU "c:" //memcached的最大连接数,如果不指定,按系统的最大值进行 "k" //是否锁定memcached所持有的内存,如果锁定了内存,其他业务持有的内存就会减小 "hi" //帮助信息 "r" //core文件的大小,如果不指定,按系统的最大值进行 "v" //调试信息 "d" //设定以daemon方式运行 "l:" //绑定的ip信息,如果服务器有多个ip,可以在多个ip上面启动多个Memcached实例,注意:这个不是可接收的IP地址 "u:" //memcached运行的用户,如果以root启动,需要指定用户,否则程序错误,退出。 "P:" //memcached以daemon方式运行时,保存pid的文件路径信息 "f:" //内存的扩容因子,这个关系到Memcached内部初始化空间时的一个变化,后面详细说明 "n:" //chunk的最小大小(byte),后续的增长都是该值*factor来进行增长的 "t:" //内部worker线程的个数,默认是4个,最大值推荐不超过64个 "D:" //内部数据存储时的分割符 "L" //指定内存页的大小,默认内存页大小为4K,页最大不超过2M,调大页的大小,可有效减小页表的大小,提高内存访问的效率 "R:" //单个worker的最大请求个数 "C" //禁用业务的cas,即compare and set "b:" //listen操作缓存连接个数 "B:" //memcached内部使用的协议,支持二进制协议和文本协议,早期只有文本协议,二进制协议是后续加上的 "I:" //单个item的最大值,默认是1M,可以修改,修改的最小值为1k,最大值不能超过128M "S" //打开sasl安全协议 "o:" /** *有四个参数项可以设置: *maxconns_fast(如果连接数超过最大连接数,立即关闭新的连接) *hashpower(hash表的大小的指数值,是按1<<hashpower来创建hash表的,默认的hashpower为16,配置值建议不超过64) *slab_reassign(是否调整/平衡各个slab所占的内存) *slab_automove(是否自动移动各个slab,如果该选项打开,会有专门的线程来进行slab的调整) */ Memcached内部是通过settings来抽象上面的这些初始化参数。 struct settings { size_t maxbytes; int maxconns; int port; int udpport; char* inter; int verbose; rel_time_t oldest_live; /* ignore existing items older than this */ int evict_to_free; char* socketpath; /* path to unix socket if using local socket */ int access; /* access mask (a la chmod) for unix domain socket */ double factor; /* chunk size growth factor */ int chunk_size; int num_threads; /* number of worker (without dispatcher) libevent threads to run */ int num_threads_per_udp; /* number of worker threads serving each udp socket */ char prefix_delimiter; /* character that marks a key prefix (for stats) */ int detail_enabled; /* nonzero if we're collecting detailed stats */ int reqs_per_event; /* Maximum number of io to process on each io-event....

January 11, 2021

Memcached源码阅读七 cas属性

Memcached源码阅读七 cas属性 cas即compare and set或者compare and swap,是实现乐观锁的一种技术,乐观锁是相对悲观锁来说的,所谓悲观锁是在数据处理过程中,完全锁定,这种能完全保证数据的一致性,但在多线程情况下,并发性能差,通常是使用各种锁技术实现;而乐观锁是通过版本号机制来实现数据一致性,过程中会使用CPU提供的原子操作指令,乐观锁能提高系统的并发性能,Memcached使用cas是保证数据的一致性,不是严格为了实现锁。 Memcached是多客户端应用,在多个客户端修改同一个数据时,会出现相互覆盖的情况,在这种情况下,使用cas版本号验证,可以有效的保证数据的一致性,Memcached默认是打开cas属性的,每次存储数据时,都会生成其cas值并和item一起存储,后续的get操作会返回系统生成的cas值,在执行set等操作时,需要将cas值传入,下面我们看看Memcached内部是如何实现cas的,关于如何使用Mecached的CAS协议,请参考文章:Memcached的CAS协议(链接:http://langyu.iteye.com/blog/680052)。 //为新的item生成cas值 uint64_t get_cas_id(void) { static uint64_t cas_id = 0; return ++cas_id; } //这段代码是store_item的代码片段,这里是执行cas存储时执行的判断逻辑, else if (ITEM_get_cas(it) == ITEM_get_cas(old_it))//cas值一致 { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++; pthread_mutex_unlock(&c->thread->stats.mutex); item_replace(old_it, it, hv);//执行存储逻辑 stored = STORED; } //cas值不一致,不进行实际的存储 else { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++; //更新统计信息 pthread_mutex_unlock(&c->thread->stats.mutex); if (settings.verbose > 1) { //打印错误日志 fprintf(stderr, "CAS: failure: expected %llu, got %llu\n", (unsigned long long) ITEM_get_cas(old_it), (unsigned long long) ITEM_get_cas(it)); } stored = EXISTS; }

January 11, 2021

Memcached源码阅读九 连接队列

Memcached源码阅读九 连接队列 Memcached中Master线程和Worker线程之间通信连接信息时,是通过连接队列来通信的,即Master线程投递一个消息到Worker线程的连接队列中,Worker线程从连接队列中读取链接信息来执行连接操作,下面我们简单分析下Memcached的连接队列结构。 typedef struct conn_queue_item CQ_ITEM;//每个连接信息的封装 struct conn_queue_item { int sfd;//accept之后的描述符 enum conn_states init_state;//连接的初始状态 int event_flags;//libevent标志 int read_buffer_size;//读取数据缓冲区大小 enum network_transport transport;//内部通信所用的协议 CQ_ITEM *next;//用于实现链表的指针 }; typedef struct conn_queue CQ;//连接队列的封装 struct conn_queue { CQ_ITEM *head;//头指针,注意这里是单链表,不是双向链表 CQ_ITEM *tail;//尾部指针, pthread_mutex_t lock;//锁 pthread_cond_t cond;//条件变量 }; //连接队列初始化 static void cq_init(CQ *cq) { pthread_mutex_init(&cq->lock, NULL);//初始化锁 pthread_cond_init(&cq->cond, NULL);//初始化条件变量 cq->head = NULL; cq->tail = NULL; } //获取一个连接 static CQ_ITEM *cq_pop(CQ *cq) { CQ_ITEM *item; pthread_mutex_lock(&cq->lock);//执行加锁操作 item = cq->head; //获得头部指针指向的数据 if (NULL != item) { //更新头指针信息 cq->head = item->next; //这里为空的话,则尾指针也为空,链表此时为空 if (NULL == cq->head) cq->tail = NULL; } //释放锁操作 pthread_mutex_unlock(&cq->lock); return item; } //添加一个连接信息 static void cq_push(CQ *cq, CQ_ITEM *item) { item->next = NULL; pthread_mutex_lock(&cq->lock);//执行加锁操作 //如果链表目前是空的 if (NULL == cq->tail) //则头指针指向该结点 cq->head = item; else cq->tail->next = item;//添加到尾部 cq->tail = item; //尾部指针后移 pthread_cond_signal(&cq->cond); //唤醒条件变量,如果有阻塞在该条件变量的线程,则会唤醒该线程 pthread_mutex_unlock(&cq->lock); } //创建连接队列 static CQ_ITEM *cqi_new(void) { CQ_ITEM *item = NULL; pthread_mutex_lock(&cqi_freelist_lock); //加锁,保持数据同步 if (cqi_freelist) { //更新空闲链表信息 item = cqi_freelist; cqi_freelist = item->next; } pthread_mutex_unlock(&cqi_freelist_lock); //如果空闲链表没有多余的链接 if (NULL == item) { int i; //初始化64个空闲连接信息 item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC); if (NULL == item) return NULL; //将空闲的连接信息进行链接 for (i = 2; i < ITEMS_PER_ALLOC; i++) item[i - 1]....

January 11, 2021

Memcached源码阅读二 网络监听的建立

Memcached源码阅读二 网络监听的建立 Memcahced是一个服务器程序,所以需要建立网络监听来接受其他客户端机器的连接,下面分析下其过程,这次分析是基于Memcached 1.4.15版本分析的。 // 如果socketpath为空,则表示使用的TCP / UDP, 不是使用unix socket //如果socketpath为空,则表示使用的TCP/UDP,不是使用unix socket if (settings.socketpath == NULL) { //可以从环境变量读取端口文件所在的文件路径 const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); char temp_portnumber_filename[PATH_MAX]; FILE *portnumber_file = NULL; //如果端口文件不为空,则打开 if (portnumber_filename != NULL) { snprintf(temp_portnumber_filename, sizeof(temp_portnumber_filename), "%s.lck", portnumber_filename); portnumber_file = fopen(temp_portnumber_filename, "a"); if (portnumber_file == NULL) { fprintf(stderr, "Failed to open \"%s\": %s\n", temp_portnumber_filename, strerror(errno)); } } //settings.port表示Memcached采用的是TCP协议,创建TCP Socket,监听并且绑定 errno = 0; if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); } //settings.udpport表示Memcached采用的是UDP协议,创建UDP Socket,监听并且绑定 errno = 0; if (settings.udpport && server_sockets(settings.udpport, udp_transport, portnumber_file)) { vperror("failed to listen on UDP port %d", settings.udpport); exit(EX_OSERR); } //端口文件不为空 if (portnumber_file) { fclose(portnumber_file);//关闭文件 rename(temp_portnumber_filename, portnumber_filename);//重命名端口文件 } } //TCP和UDP使用的是同一个接口来创建监听和绑定 static int server_sockets(int port, enum network_transport transport, FILE *portnumber_file) { //settings....

January 11, 2021

Memcached源码阅读五 资源初始化

Memcached源码阅读五 资源初始化 Memcached内部有hash表,各种统计信息,工作线程,网络,连接,内存结构等,在memcached启动时(执行main函数),会对这些资源进行初始化的,网络和内存的初始化操作放到后续分析,这次分析hash表,统计信息,工作线程,网络连接的初始化过程。 1 hash表的初始化 //hash表的初始化,传入的参数是启动时传入的 assoc_init(settings.hashpower_init); //hashsize的实现 #define hashsize(n) ((ub4)1<<(n)) //主hash表结构定义,在hash表扩容时,会有次hash表,所以有主次hash表区分,该结构是指针的指针,也即相当于数组指针 static item** primary_hashtable = 0; void assoc_init(const int hashtable_init) { if (hashtable_init) { //如果设置了初始化参数,则按设置的参数进行初始化 hashpower = hashtable_init; } //hashpower的默认值为16,如果未设置新值,则按默认值进行初始化 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *)); if (! primary_hashtable) { fprintf(stderr, "Failed to init hashtable.\n"); exit(EXIT_FAILURE); } STATS_LOCK();//全局统计信息加锁,保证数据同步 stats.hash_power_level = hashpower; stats.hash_bytes = hashsize(hashpower) * sizeof(void *); STATS_UNLOCK(); } 2 统计信息的初始化 Memcached内部有很多全局的统计信息,用于实时获取各个资源的使用情况,后面将会看到,所有对统计信息的更新都需要加锁,而这些信息的更新是和Memcached的操作次数同数量级的,所以,在一定程度来说,这些统计信息对性能有影响。 stats结构是对统计信息的一个抽象,各个字段都比较好理解,不做解释。 struct stats { pthread_mutex_t mutex; unsigned int curr_items; unsigned int total_items; uint64_t curr_bytes; unsigned int curr_conns; unsigned int total_conns; uint64_t rejected_conns; unsigned int reserved_fds; unsigned int conn_structs; uint64_t get_cmds; uint64_t set_cmds; uint64_t touch_cmds; uint64_t get_hits; uint64_t get_misses; uint64_t touch_hits; uint64_t touch_misses; uint64_t evictions; uint64_t reclaimed; time_t started; /* when the process was started */ bool accepting_conns; /* whether we are currently accepting */ uint64_t listen_disabled_num; unsigned int hash_power_level; /* Better hope it's not over 9000 */ uint64_t hash_bytes; /* size used for hash tables */ bool hash_is_expanding; /* If the hash table is being expanded */ uint64_t expired_unfetched; /* items reclaimed but never touched */ uint64_t evicted_unfetched; /* items evicted but never touched */ bool slab_reassign_running; /* slab reassign in progress */ uint64_t slabs_moved; /* times slabs were moved around */ }; 统计信息的初始化也就是对stats变量的一个初始化。...

January 11, 2021

Memcached源码阅读八 内存池

Memcached源码阅读八 内存池 Memcached内部维护了一个内存池来减少频繁的malloc和free,在该内存池的基础上面实现了slab内存管理,下面简单介绍下内存池的实现,大家在实现类似结构时,可以做个参考。 static void *mem_base = NULL;//mem_base指向新申请的内存空间,指向整个内存空间的头部 static void *mem_current = NULL;//指向已经分配过的空间,且指向已经分配了空间的尾部 static size_t mem_avail = 0;//剩余空间大小 //部分初始化操作 mem_limit = limit;//初始容量 mem_base = malloc(mem_limit);//申请内存空间 if (mem_base != NULL) //如果不为空 { mem_current = mem_base; //当前还没分配,所以其指向为整个空间 mem_avail = mem_limit; //可用空间为满 } else { fprintf(stderr, "Warning: Failed to allocate requested memory in" " one large chunk.\nWill allocate in smaller chunks\n"); } //分配空间的过程,分配size大小的空间 static void *memory_allocate(size_t size) { void *ret; //如果未初始化 if (mem_base == NULL) { ret = malloc(size); } else { ret = mem_current; if (size > mem_avail) { return NULL; } //执行对齐操作 if (size % CHUNK_ALIGN_BYTES) { size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES); } mem_current = ((char*)mem_current) + size; if (size < mem_avail) { mem_avail -= size; } else { mem_avail = 0; } } return ret; }

January 11, 2021

Memcached源码阅读六 get过程

Memcached源码阅读六 get过程 我们在前面分析过,Memcached从网络读取完数据,解析数据,如果是get操作,则执行get操作,下面我们分析下get操作的流程。 //根据key信息和key的长度信息读取数据 item *item_get(const char *key, const size_t nkey) { item *it; uint32_t hv; hv = hash(key, nkey, 0);//获得分段锁信息,如果未进行扩容,则item的hash表是多个hash桶共用同一个锁,即是分段的锁 item_lock(hv);//执行分段加锁 it = do_item_get(key, nkey, hv);//执行get操作 item_unlock(hv);//释放锁 return it; } //执行分段加锁 void item_lock(uint32_t hv) { uint8_t *lock_type = pthread_getspecific(item_lock_type_key); if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);//执行分段加锁 } else {//如果在扩容过程中 mutex_lock(&item_global_lock); } } //执行分段解锁 void item_unlock(uint32_t hv) { uint8_t *lock_type = pthread_getspecific(item_lock_type_key); if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { mutex_unlock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);//释放分段锁 } else {//如果在扩容过程中 mutex_unlock(&item_global_lock); } } //执行读取操作 item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { item *it = assoc_find(key, nkey, hv);//从Hash表中获取相应的结构 if (it !...

January 11, 2021

Memcached源码阅读十 Hash表操作

Memcached源码阅读十 Hash表操作 Memcached的Hash表用来提高数据访问性能,通过链接法来解决Hash冲突,当Hash表中数据多余Hash表容量的1.5倍时,Hash表就会扩容,Memcached的Hash表操作没什么特别的,我们这里简单介绍下Memcached里面的Hash表操作。 //hash表插入元素 int assoc_insert(item *it, const uint32_t hv) { unsigned int oldbucket; //如果已经开始扩容,且扩容的桶编号大于目前的item所在桶的编号 if (expanding && (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) { //这里是类似单链表的,按单链表的操作进行插入 it->h_next = old_hashtable[oldbucket]; old_hashtable[oldbucket] = it; } else { //已经扩容,则按新的Hash规则进行路由 it->h_next = primary_hashtable[hv & hashmask(hashpower)]; //这里在新的Hash表中执行单链表插入 primary_hashtable[hv & hashmask(hashpower)] = it; } hash_items++;//元素个数+1 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { //开始扩容 assoc_start_expand();//唤醒扩容条件变量 } MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items); return 1; } //hash表删除元素 void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) { item **before = _hashitem_before(key, nkey, hv); //获得item对应的桶的前一个元素 if (*before) { item *nxt; hash_items--;//元素个数-1 MEMCACHED_ASSOC_DELETE(key, nkey, hash_items); nxt = (*before)->h_next;//执行单链表的删除操作 (*before)->h_next = 0; *before = nxt; return; } assert(*before !...

January 11, 2021

Memcached源码阅读十一 LRU操作

Memcached源码阅读十一 LRU操作 LRU是最近最少使用的简称,该技术经常用来实现cache数据更新,Memcached使用LRU技术来淘汰老的数据,Memcached默认是启用LRU操作的,在这种情况下所有的set操作都会成功,如果Memcached的内存池已经使用完,则会淘汰老数据来存放新数据,如果关闭了Memcached的LRU,则当Memcached没有多余的内存空间时,Memcached之间返回错误,下面我们分析下LRU的相关操作。 里面再附一张Memcached的内存结构图,从图中可以看到LRU队列保持这已经分配出去的item的结构(图中指针为单链表,这里画的有误,其实是双向链表),同时每个slabclass由两个指针来维护该表,即heads和tails指针,分别指向最老的数据和最新的数据,这样便于LRU链表的操作。 //每个slabclass各有一个指针 static item *heads[LARGEST_ID]; static item *tails[LARGEST_ID]; //将item加入到对应classid的LRU链的head,这里是item加入到LRU链表中 static void item_link_q(item *it) { /* item is the new head */ item **head, **tail; assert(it->slabs_clsid < LARGEST_ID); assert((it->it_flags & ITEM_SLABBED) == 0); head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; assert(it != *head); assert((*head && *tail) || (*head == 0 && *tail == 0)); it->prev = 0; it->next = *head; if (it->next) it->next->prev = it;//执行插入数据操作 *head = it; if (*tail == 0) *tail = it; sizes[it->slabs_clsid]++; return; } //将item从对应classid的LRU链上移除,这里是item从LRU链表中删除 static void item_unlink_q(item *it) { item **head, **tail; assert(it->slabs_clsid < LARGEST_ID); head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; if (*head == it) { assert(it->prev == 0); *head = it->next; } if (*tail == it) { assert(it->next == 0); *tail = it->prev; } assert(it->next !...

January 11, 2021