Memcached源码阅读十七 状态机

Memcached源码阅读十七 状态机 按我们之前的描述,Master线程建立连接之后,分发给Worker线程,而Worker线程处理业务逻辑时,会进入状态机,状态机按不同的状态处理业务逻辑,我们在分析连接分发时,已经看到了Master线程进入状态机时在有新连接建立的时候,后续的状态都是业务逻辑的状态,其处理流程如下图所示: 共有10个状态(代码中的状态不止这些,有些没什么用,此处就没展现),状态listenning状态是Master建立连接的过程,我们已经分析过了,我们接下来分不同的文章分析其余的9中状态。 enum conn_states { conn_listening, //监听状态 conn_new_cmd, //为新连接做一些准备 conn_waiting, //等待读取一个数据包 conn_read, //读取网络数据 conn_parse_cmd, //解析缓冲区的数据 conn_write, //简单的回复数据 conn_nread, //读取固定数据的网络数据 conn_swallow, //处理不需要的写缓冲区的数据 conn_closing, //关闭连接 conn_mwrite, //顺序的写多个item数据 conn_max_state //最大状态,做断言使用 }; 这篇文件先分析conn_new_cmd和conn_wating状态,子线程最初进入的状态就是conn_new_cmd状态,这个状态主要是做一些清理。 case conn_new_cmd: //全局变量,记录每个libevent实例处理的事件,通过初始启动参数配置 --nreqs; //还可以处理请求 if (nreqs >= 0) { //整理缓冲区 reset_cmd_handler(c); } //拒绝请求 else { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.conn_yields++;//更新统计数据 pthread_mutex_unlock(&c->thread->stats.mutex); //如果缓冲区有数据,则需要处理 if (c->rbytes > 0) { //更新libevent状态 if (!update_event(c, EV_WRITE | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); conn_set_state(c, conn_closing);//关闭连接 } } stop = true; } break; //整理缓冲区 static void reset_cmd_handler(conn *c) { c->cmd = -1; c->substate = bin_no_state; //还有item if (c->item != NULL) { //删除item,本篇不分析其实现,后续分析 item_remove(c->item); c->item = NULL; } //整理缓冲区 conn_shrink(c); //缓冲区还有数据 if (c->rbytes > 0) { //更新状态 conn_set_state(c, conn_parse_cmd); } //如果没有数据 else { //进入等待状态,状态机没有数据要处理,就进入这个状态 conn_set_state(c, conn_waiting); } } //缩小缓冲区 static void conn_shrink(conn *c) { assert(c !...

January 11, 2021

Memcached源码阅读十三 do_item_alloc操作

Memcached源码阅读十三 do_item_alloc操作 前面我们分析了Memcached的set操作,其set操作在经过所有的数据有效性检查之后,如果需要存储item,则会执行item的实际存储操作,我们下面分析下其过程。 //执行item的存储操作,该操作会将item挂载到LRU表和slabcalss中 item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes, const uint32_t cur_hv) { uint8_t nsuffix; item *it = NULL; char suffix[40]; //计算item的总大小(空间) size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix); //如果使用了cas if (settings.use_cas) { //增加cas的空间 ntotal += sizeof(uint64_t); } unsigned int id = slabs_clsid(ntotal); //那大小选择合适的slab if (id == 0) return 0; //执行LRU锁 mutex_lock(&cache_lock); //存储时,会尝试从LRU中选择合适的空间的空间 int tries = 5; //如果LRU中尝试5次还没合适的空间,则执行申请空间的操作 int tried_alloc = 0; item *search; void *hold_lock = NULL; //初始化时选择的过期时间 rel_time_t oldest_live = settings.oldest_live; search = tails[id];//第id个LRU表的尾部 for (; tries > 0 && search != NULL; tries--, search=search->prev) { uint32_t hv = hash(ITEM_key(search), search->nkey, 0);//获取分段锁 //尝试执行锁操作,这里执行的乐观锁 if (hv !...

January 11, 2021

Memcached源码阅读十二 set操作

Memcached源码阅读十二 set操作 之前分析了Memcached的get操作,下面分析set操作的流程。 //存储item enum store_item_type store_item(item *item, int comm, conn* c) { enum store_item_type ret; uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0);//获取Hash表的分段锁 item_lock(hv);//执行数据同步 ret = do_store_item(item, comm, c, hv);//存储item item_unlock(hv); return ret; } //存储item enum store_item_type do_store_item(item *it, int comm, conn *c,const uint32_t hv) { char *key = ITEM_key(it);//读取item对应的key item *old_it = do_item_get(key, it->nkey, hv); //读取相应的item,如果没有相关的数据,old_it为NULL enum store_item_type stored = NOT_STORED;//item状态标记 item *new_it = NULL; int flags; //如果old_it不为NULL,且操作为add操作 if (old_it != NULL && comm == NREAD_ADD) { do_item_update(old_it);//更新数据 } //old_it为空,且操作为REPLACE,则什么都不做 else if (!old_it && (comm == NREAD_REPLACE || comm == NREAD_APPEND || comm == NREAD_PREPEND)) { //memcached的Replace操作是替换已有的数据,如果没有相关数据,则不做任何操作 } //以cas方式读取 else if (comm == NREAD_CAS) { if (old_it == NULL) //为空 { // LRU expired stored = NOT_FOUND;//修改状态 pthread_mutex_lock(&c->thread->stats....

January 11, 2021

Memcached源码阅读十六 线程交互

Memcached源码阅读十六 线程交互 Memcached按之前的分析可以知道,其是典型的Master-Worker线程模型,这种模型很典型,其工作模型是Master绑定端口,监听网络连接,接受网络连接之后,通过线程间通信来唤醒Worker线程,Worker线程已经连接的描述符执行读写操作,这种模型简化了整个通信模型,下面分析下这个过程。 case conn_listening: addrlen = sizeof(addr); //Master线程(main)进入状态机之后执行accept操作,这个操作也是非阻塞的。 if ((sfd = accept(c->sfd, (struct sockaddr *) &addr, &addrlen)) == -1) { //非阻塞模型,这个错误码继续等待 if (errno == EAGAIN || errno == EWOULDBLOCK) { stop = true; } //连接超载 else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } //已经accept成功,将accept之后的描述符设置为非阻塞的 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } //判断是否超过最大连接数 if (settings.maxconns_fast && stats.curr_conns + stats.reserved_fd >= settings.maxconns - 1) { str = "ERROR Too many open connections\r\n"; res = write(sfd, str, strlen(str)); close(sfd); STATS_LOCK(); stats....

January 11, 2021

Memcached源码阅读十四 item结构

Memcached源码阅读十四 item结构 item是Memcached中抽象实际数据的结构,我们分析下item的一些特性,便于后续Memcached的其他特性分析。 typedef struct _stritem { struct _stritem *next;//item在slab中存储时,是以双链表的形式存储的,next即后向指针 struct _stritem *prev;//prev为前向指针 struct _stritem *h_next;//Hash桶中元素的链接指针 rel_time_t time;//最近访问时间 rel_time_t exptime;//过期时间 int nbytes;//数据大小 unsigned short refcount;//引用次数 uint8_t nsuffix;//不清楚什么意思? uint8_t it_flags;//不清楚什么意思? uint8_t slabs_clsid;//标记item属于哪个slabclass下 uint8_t nkey;//key的长度 union { uint64_t cas; char end; } data[];//真实的数据信息 } item; 其结构图如下所示: Item由两部分组成,item的属性信息和item的数据部分,属性信息解释如上,数据部分包括cas,key和真实的value信息,item在内存中的存储形式如下: 这个图画出了部分结构,还有Hash表的结构没有画出。 这里大概介绍了item的一些信息,后面我们会分析item插入Hash表等信息。

January 11, 2021

Memcached源码阅读四 内存初始化

Memcached源码阅读四 内存初始化 Memcached作为内存cache服务器,内存高效管理是其最重要的任务之一,Memcached使用SLAB管理其内存,SLAB内存管理直观的解释就是分配一块大的内存,之后按不同的块(48byte, 64byte, … 1M)等切分这些内存,存储业务数据时,按需选择合适的内存空间存储数据。 Memcached首次默认分配64M的内存,之后所有的数据都是在这64M空间进行存储,在Memcached启动之后,不会对这些内存执行释放操作,这些内存只有到Memcached进程退出之后会被系统回收,下面分析下Memcached的内存初始化过程。 //内存初始化,settings.maxbytes是Memcached初始启动参数指定的内存值大小,settings.factor是内存增长因子 slabs_init(settings.maxbytes, settings.factor, preallocate); #define POWER_SMALLEST 1 //最小slab编号 #define POWER_LARGEST 200 //首次初始化200个slab //实现内存池管理相关的静态全局变量 static size_t mem_limit = 0;//总的内存大小 static size_t mem_malloced = 0;//初始化内存的大小,这个貌似没什么用 static void *mem_base = NULL;//指向总的内存的首地址 static void *mem_current = NULL;//当前分配到的内存地址 static size_t mem_avail = 0;//当前可用的内存大小 static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];//定义slab结合,总共200个 void slabs_init(const size_t limit, const double factor, const bool prealloc) { int i = POWER_SMALLEST - 1; //size表示申请空间的大小,其值由配置的chunk_size和单个item的大小来指定 unsigned int size = sizeof(item) + settings.chunk_size; mem_limit = limit;//mem_limit是全局变量 if (prealloc) { //支持预分配 mem_base = malloc(mem_limit);//申请地址,mem_base指向申请的地址 if (mem_base != NULL) { //mem_current指向当前地址 mem_current = mem_base; //可用内存大小为mem_limit mem_avail = mem_limit; } else { //支持预分配失败 fprintf(stderr, "Warning: Failed to allocate requested memory in" " one large chunk....

January 11, 2021

Memcached源码阅读序 服务器资源调整

Memcached源码阅读序 服务器资源调整 本篇作为Memcached源码分析的开篇,这次阅读的源码版本为: 1.4.15,开源软件各个版本之间差异比较大,同学们学习时,记得核对版本。 memcached的main函数位于memcached.c文件中,从main函数启动之后,会初始化一些资源和申请一些服务器资源,如下面所示: 1 Core文件大小和进程打开文件个数限制的调整。 if (maxcore != 0) { struct rlimit rlim_new; //获取当前Core文件大小的配置值 if (getrlimit(RLIMIT_CORE, &rlim) == 0) { //变量初始化为无限制 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY; if (setrlimit(RLIMIT_CORE, &rlim_new) != 0)//如果设置失败 { //变量初始化为当前值的最大值 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max; (void) setrlimit(RLIMIT_CORE, &rlim_new);//重新进行设置 } } //再次确认Core文件允许的大小,如果当前的Core文件的大小为0,则不允许Core文件产生,和maxcore!=0不符,程序结束 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) { fprintf(stderr, "failed to ensure corefile creation\n"); exit(EX_OSERR); } } //读取进程允许打开的文件数信息,读取失败,程序退出 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { fprintf(stderr, "failed to getrlimit number of files\n"); exit(EX_OSERR); } else { //按memcached启动时的指定的最大连接数进行设置 rlim.rlim_cur = settings.maxconns; rlim.rlim_max = settings.maxconns; if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) { fprintf(stderr, "failed to set rlimit for open files....

January 11, 2021

Memcached阅读十五 Hash表扩容

Memcached阅读十五 Hash表扩容 Hash表是Memcached里面最重要的结构之一,其采用链接法来处理Hash冲突,当Hash表中的项太多时,也就是Hash冲突比较高的时候,Hash表的遍历就脱变成单链表,此时为了提供Hash的性能,Hash表需要扩容,Memcached的扩容条件是当表中元素个数超过Hash容量的1.5倍时就进行扩容,扩容过程由独立的线程来完成,扩容过程中会采用2个Hash表,将老表中的数据通过Hash算法映射到新表中,每次移动的桶的数目可以配置,默认是每次移动老表中的1个桶。 //hash表中增加元素 int assoc_insert(item *it, const uint32_t hv) { unsigned int oldbucket; //如果已经进行扩容且目前进行扩容还没到需要插入元素的桶,则将元素添加到旧桶中 if (expanding &&(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) { //添加元素 it->h_next = old_hashtable[oldbucket]; old_hashtable[oldbucket] = it; } else { //如果没扩容,或者扩容已经到了新的桶中,则添加元素到新表中 it->h_next = primary_hashtable[hv & hashmask(hashpower)];//添加元素 primary_hashtable[hv & hashmask(hashpower)] = it; } hash_items++;//元素数目+1 //还没开始扩容,且表中元素个数已经超过Hash表容量的1.5倍 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { //唤醒扩容线程 assoc_start_expand(); } MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items); return 1; } //唤醒扩容线程 static void assoc_start_expand(void) { if (started_expanding) return; started_expanding = true; //唤醒信号量 pthread_cond_signal(&maintenance_cond); } //启动扩容线程,扩容线程在main函数中会启动,启动运行一遍之后会阻塞在条件变量maintenance_cond上面,插入元素超过规定,唤醒条件变量 static void *assoc_maintenance_thread(void *arg) { //do_run_maintenance_thread的值为1,即该线程持续运行 while (do_run_maintenance_thread) { int ii = 0; item_lock_global();//加Hash表的全局锁 mutex_lock(&cache_lock);//加cache_lock锁 //执行扩容时,每次按hash_bulk_move个桶来扩容 for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { item *it, *next; int bucket; //老表每次移动一个桶中的一个元素 for (it = old_hashtable[expand_bucket]; NULL !...

January 11, 2021

Part I

此仓库是公众号【高性能服务器开发】文章汇总,如需下载全部文章,可以在公众号回复关键字“文章下载”即可得到下载链接。分享和转发文章时请保留作者信息,部分文章来源于网络,侵权请联系删除。 我也专门建立了读者交流群,想加群的读者可以加我微信easy_coder 在线阅读站点1:https://balloonwj.github.io/cpp-guide-web/ 在线阅读站点2: http://balloonwj.gitee.io/cpp-guide-web/ 备份站点:http://101.37.25.166/blog/ 如需下载该站点源码用于自己搭建站点,可以在【高性能服务器开发】微信公众号后台回复关键字“站点下载”即可得到下载链接。 Part I C++必知必会的知识点 如何成为一名合格的C/C++开发者? 不定参数函数实现var_arg系列的宏 你一定要搞明白的C函数调用方式与栈原理 深入理解C/C++中的指针 详解C++11中的智能指针 C++17结构化绑定 C++必须掌握的pimpl惯用法 用Visual Studio调试Linux程序 如何使用Visual Studio管理和阅读开源项目代码 利用cmake工具生成Visual Studio工程文件 多线程 后台C++开发你一定要知道的条件变量 整型变量赋值是原子操作吗? 网络编程 bind 函数重难点解析 connect 函数在阻塞和非阻塞模式下的行为 select 函数重难点解析 Linux epoll 模型(含LT 模式和 ET 模式详解) socket 的阻塞模式和非阻塞模式 非阻塞模式下 send 和 recv 函数的返回值 服务器开发通信协议设计介绍 TCP 协议如何解决粘包、半包问题 网络通信中收发数据的正确姿势 服务器端发数据时,如果对端一直不收,怎么办? 程序员必知必会的网络命令 利用telnet命令发电子邮件 做Java或者C++开发都应该知道的lsof命令 Linux网络故障排查的瑞士军刀nc命令 Linux tcpdump使用详解 从抓包的角度分析connect函数的连接过程 服务器开发中网络数据分析与故障排查经验漫谈 Part II 高性能服务器框架设计 主线程与工作线程的分工 Reactor模式 实例:一个服务器程序的架构介绍 错误码系统的设计 日志系统的设计 如何设计断线自动重连机制 心跳包机制设计详解 业务数据处理一定要单独开线程吗 C++ 高性能服务器网络框架设计细节 服务器开发案例实战 从零实现一个http服务器 从零实现一款12306刷票软件 从零实现一个邮件收发客户端 从零开发一个WebSocket服务器 从零学习开源项目系列(一)从一款多人联机实时对战游戏开始 从零学习开源项目系列(二)最后一战概况 从零学习开源项目系列(三) CSBattleMgr服务源码研究 从零学习开源项目系列(四)LogServer源码探究 Part III TeamTalk IM源码分析 01 TeamTalk介绍 02 服务器端的程序的编译与部署 03 服务器端的程序架构介绍 04 服务器端db_proxy_server源码分析 05 服务器端msg_server源码分析 06 服务器端login_server源码分析 07 服务器端msfs源码分析 08 服务器端file_server源码分析 09 服务器端route_server源码分析 10 开放一个TeamTalk测试服务器地址和几个测试账号 11 pc客户端源码分析 libevent源码深度剖析...

January 11, 2021

pimpl 惯用法

pimpl 惯用法 现在这里有一个名为 CSocketClient 的网络通信类,定义如下: /** * 网络通信的基础类, SocketClient.h * zhangyl 2017.07.11 */ class CSocketClient { public: CSocketClient(); ~CSocketClient(); public: void SetProxyWnd(HWND hProxyWnd); bool Init(CNetProxy* pNetProxy); bool Uninit(); int Register(const char* pszUser, const char* pszPassword); void GuestLogin(); BOOL IsClosed(); BOOL Connect(int timeout = 3); void AddData(int cmd, const std::string& strBuffer); void AddData(int cmd, const char* pszBuff, int nBuffLen); void Close(); BOOL ConnectServer(int timeout = 3); BOOL SendLoginMsg(); BOOL RecvLoginMsg(int& nRet); BOOL Login(int& nRet); private: void LoadConfig(); static UINT CALLBACK SendDataThreadProc(LPVOID lpParam); static UINT CALLBACK RecvDataThreadProc(LPVOID lpParam); bool Send(); bool Recv(); bool CheckReceivedData(); void SendHeartbeatPackage(); private: SOCKET m_hSocket; short m_nPort; char m_szServer[64]; long m_nLastDataTime; //最近一次收发数据的时间 long m_nHeartbeatInterval; //心跳包时间间隔,单位秒 CRITICAL_SECTION m_csLastDataTime; //保护m_nLastDataTime的互斥体 HANDLE m_hSendDataThread; //发送数据线程 HANDLE m_hRecvDataThread; //接收数据线程 std::string m_strSendBuf; std::string m_strRecvBuf; HANDLE m_hExitEvent; bool m_bConnected; CRITICAL_SECTION m_csSendBuf; HANDLE m_hSemaphoreSendBuf; HWND m_hProxyWnd; CNetProxy* m_pNetProxy; int m_nReconnectTimeInterval; //重连时间间隔 time_t m_nLastReconnectTime; //上次重连时刻 CFlowStatistics* m_pFlowStatistics; }; 这段代码来源于笔者实际项目中开发的一个股票客户端的软件。...

January 11, 2021