后台C++开发你一定要知道的条件变量

  • 2021-01-11
  • 浏览 (338)

后台C++开发你一定要知道的条件变量

今天因为工作需要,需要帮同事用C语言(不是C++)写一个生产者消费者的任务队列工具库,考虑到不能使用任何第三库和C++的任何特性,所以我将任务队列做成一个链表,生产者在队列尾部加入任务,消费者在队列头部取出任务。很快就写好了,代码如下: /** * 线程池工具, ctrip_thread_pool.h * zhangyl 2018.03.23 */ #ifndef __CTRIP_THREAD_POOL_H__ #define __CTRIP_THREAD_POOL_H__ #include <pthread.h> #ifndef NULL #define NULL 0 #endif #define PUBLIC PUBLIC struct ctrip_task { struct ctrip_task* pNext; int value; }; struct ctrip_thread_info { //线程退出标志 int thread_running; int thread_num; int tasknum; struct ctrip_task* tasks; pthread_t* threadid; pthread_mutex_t mutex; pthread_cond_t cond; }; /* 初始化线程池线程数目 * @param thread_num 线程数目, 默认为8个 */ PUBLIC void ctrip_init_thread_pool(int thread_num); /* 销毁线程池 */ PUBLIC void ctrip_destroy_thread_pool(); /**向任务池中增加一个任务 * @param t 需要增加的任务 */ PUBLIC void ctrip_thread_pool_add_task(struct ctrip_task* t); /**从任务池中取出一个任务 * @return 返回得到的任务 */ struct ctrip_task* ctrip_thread_pool_retrieve_task(); /**执行任务池中的任务 * @param t 需要执行的任务 */ PUBLIC void ctrip_thread_pool_do_task(struct ctrip_task* t); /**线程函数 * @param thread_param 线程参数 */ void* ctrip_thread_routine(void* thread_param); #endif //!__CTRIP_THREAD_POOL_H__ /** * 线程池工具, ctrip_thread_pool.c * zhangyl 2018.03.23 */ #include "ctrip_thread_pool.h" #include <stdio.h> #include <stdlib.h> struct ctrip_thread_info g_threadinfo; int thread_running = 0; void ctrip_init_thread_pool(int thread_num) { if (thread_num <= 0) thread_num = 5; pthread_mutex_init(&g_threadinfo.mutex, NULL); pthread_cond_init(&g_threadinfo.cond, NULL); g_threadinfo.thread_num = thread_num; g_threadinfo.thread_running = 1; g_threadinfo.tasknum = 0; g_threadinfo.tasks = NULL; thread_running = 1; g_threadinfo.threadid = (pthread_t*)malloc(sizeof(pthread_t) * thread_num); int i; for (i = 0; i < thread_num; ++i) { pthread_create(&g_threadinfo.threadid[i], NULL, ctrip_thread_routine, NULL); } } void ctrip_destroy_thread_pool() { g_threadinfo.thread_running = 0; thread_running = 0; pthread_cond_broadcast(&g_threadinfo.cond); int i; for (i = 0; i < g_threadinfo.thread_num; ++i) { pthread_join(g_threadinfo.threadid[i], NULL); } free(g_threadinfo.threadid); pthread_mutex_destroy(&g_threadinfo.mutex); pthread_cond_destroy(&g_threadinfo.cond); } void ctrip_thread_pool_add_task(struct ctrip_task* t) { if (t == NULL) return; pthread_mutex_lock(&g_threadinfo.mutex); struct ctrip_task* head = g_threadinfo.tasks; if (head == NULL) g_threadinfo.tasks = t; else { while (head->pNext != NULL) { head = head->pNext; } head->pNext = t; } ++g_threadinfo.tasknum; //当有变化后,使用signal通知wait函数 pthread_cond_signal(&g_threadinfo.cond); pthread_mutex_unlock(&g_threadinfo.mutex); } struct ctrip_task* ctrip_thread_pool_retrieve_task() { struct ctrip_task* head = g_threadinfo.tasks; if (head != NULL) { g_threadinfo.tasks = head->pNext; --g_threadinfo.tasknum; printf("retrieve a task, task value is [%d]\n", head->value); return head; } printf("no task\n"); return NULL; } void* ctrip_thread_routine(void* thread_param) { printf("thread NO.%d start.\n", (int)pthread_self()); while (thread_running/*g_threadinfo.thread_running*/) { struct ctrip_task* current = NULL; pthread_mutex_lock(&g_threadinfo.mutex); while (g_threadinfo.tasknum <= 0) { //如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。 //当变化后,条件合适,将直接获得锁。 pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex); if (!g_threadinfo.thread_running) break; current = ctrip_thread_pool_retrieve_task(); if (current != NULL) break; }// end inner-while-loop pthread_mutex_unlock(&g_threadinfo.mutex); ctrip_thread_pool_do_task(current); }// end outer-while-loop printf("thread NO.%d exit.\n", (int)pthread_self()); } void ctrip_thread_pool_do_task(struct ctrip_task* t) { if (t == NULL) return; //TODO: do your work here printf("task value is [%d]\n", t->value); //TODO:如果t需要释放,记得在这里释放 } 测试代码如下: // ctrip_thread_pool.cpp : Defines the entry point for the console application. // //#include "stdafx.h" #include "ctrip_thread_pool.h" #include <stdlib.h> #include <unistd.h> int main(int argc, char* argv[]) { ctrip_init_thread_pool(5); struct ctrip_task* task = NULL; int i; for (i = 0; i < 100; ++i) { task = (struct ctrip_task*)malloc(sizeof(struct ctrip_task)); task->value = i + 1; task->pNext = NULL; printf("add task, task value [%d]\n", task->value); ctrip_thread_pool_add_task(task); } sleep(10); ctrip_destroy_thread_pool(); return 0; } 代码很快就写好了,但是每次程序只能执行前几个加到任务池子里面的任务,导致池子有不少任务积累在池子里面。甚是奇怪,我也看了半天才看出结果。聪明的你,能看出上述代码为啥只能执行加到池子里面的前几个任务?先不要看答案,自己想一会儿。 linux条件变量是做后台开发必须熟练掌握的基础知识,而条件变量使用存在以下几个非常让人迷惑的地方,讲解如下 第一、必须要结合一个互斥体一起使用。使用流程如下: pthread_mutex_lock(&g_threadinfo.mutex) pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex); pthread_mutex_unlock(&g_threadinfo.mutex); 上面的代码,我们分为一二三步,当条件不满足是pthread_cond_wait会挂起线程,但是不知道你有没有注意到,如果在第二步挂起线程的话,第一步的mutex已经被上锁,谁来解锁?mutex的使用原则是谁上锁谁解锁,所以不可能在其他线程来给这个mutex解锁,但是这个线程已经挂起了,这就死锁了。所以pthread_cond_wait在挂起之前,额外做的一个事情就是给绑定的mutex解锁。反过来,如果条件满足,pthread_cond_wait不挂起线程,pthread_cond_wait将什么也不做,这样就接着走pthread_mutex_unlock解锁的流程。而在这个加锁和解锁之间的代码就是我们操作受保护资源的地方。 第二,不知道你有没有注意到pthread_cond_wait是放在一个while循环里面的: pthread_mutex_lock(&g_threadinfo.mutex); while (g_threadinfo.tasknum <= 0) { //如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。 //当变化后,条件合适,将直接获得锁。 pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex); if (!g_threadinfo.thread_running) break; current = ctrip_thread_pool_retrieve_task(); if (current != NULL) break; }// end inner-while-loop pthread_mutex_unlock(&g_threadinfo.mutex); 注意,我说的是内层的while循环,不是外层的。pthread_cond_wait一定要放在一个while循环里面吗?一定要的。这里有一个非常重要的关于条件变量的基础知识,叫条件变量的虚假唤醒(spurious wakeup),那啥叫条件变量的虚假唤醒呢?假设pthread_cond_wait不放在这个while循环里面,正常情况下,pthread_cond_wait因为条件不满足,挂起线程。然后,外部条件满足以后,调用pthread_cond_signal或pthread_cond_broadcast来唤醒挂起的线程。这没啥问题。但是条件变量可能在某些情况下也被唤醒,这个时候pthread_cond_wait处继续往下执行,但是这个时候,条件并不满足(比如任务队列中仍然为空)。这种唤醒我们叫“虚假唤醒”。为了避免虚假唤醒时,做无意义的动作,我们将pthread_cond_wait放到while循环条件中,这样即使被虚假唤醒了,由于while条件(比如任务队列是否为空,资源数量是否大于0)仍然为true,导致线程进行继续挂起。有人说条件变量是最不可能用错的线程之间同步技术,我却觉得这是最容易使用错误的线程之间同步技术。 上述代码存在的问题是,只考虑了任务队列开始为空,生产者后来添加了任务,条件变量被唤醒,然后消费者取任务执行的逻辑。假如一开始池中就有任务呢?这个原因导致,只有开始的几个添加到任务队列中任务被执行。因为一旦任务队列不为空。内层while循环条件将不再满足,导致消费者线程不再从任务队列中取任务消费。正确的代码如下: /** * 线程池工具, ctrip_thread_pool.c(修正后的代码) * zhangyl 2018.03.23 */ #include "ctrip_thread_pool.h" #include <stdio.h> #include <stdlib.h> struct ctrip_thread_info g_threadinfo; void ctrip_init_thread_pool(int thread_num) { if (thread_num <= 0) thread_num = 5; pthread_mutex_init(&g_threadinfo.mutex, NULL); pthread_cond_init(&g_threadinfo.cond, NULL); g_threadinfo.thread_num = thread_num; g_threadinfo.thread_running = 1; g_threadinfo.tasknum = 0; g_threadinfo.tasks = NULL; g_threadinfo.threadid = (pthread_t*)malloc(sizeof(pthread_t) * thread_num); int i; for (i = 0; i < thread_num; ++i) { pthread_create(&g_threadinfo.threadid[i], NULL, ctrip_thread_routine, NULL); } } void ctrip_destroy_thread_pool() { g_threadinfo.thread_running = 0; pthread_cond_broadcast(&g_threadinfo.cond); int i; for (i = 0; i < g_threadinfo.thread_num; ++i) { pthread_join(g_threadinfo.threadid[i], NULL); } free(g_threadinfo.threadid); pthread_mutex_destroy(&g_threadinfo.mutex); pthread_cond_destroy(&g_threadinfo.cond); } void ctrip_thread_pool_add_task(struct ctrip_task* t) { if (t == NULL) return; pthread_mutex_lock(&g_threadinfo.mutex); struct ctrip_task* head = g_threadinfo.tasks; if (head == NULL) g_threadinfo.tasks = t; else { while (head->pNext != NULL) { head = head->pNext; } head->pNext = t; } ++g_threadinfo.tasknum; //当有变化后,使用signal通知wait函数 pthread_cond_signal(&g_threadinfo.cond); pthread_mutex_unlock(&g_threadinfo.mutex); } struct ctrip_task* ctrip_thread_pool_retrieve_task() { struct ctrip_task* head = g_threadinfo.tasks; if (head != NULL) { g_threadinfo.tasks = head->pNext; --g_threadinfo.tasknum; printf("retrieve a task, task value is [%d]\n", head->value); return head; } printf("no task\n"); return NULL; } void* ctrip_thread_routine(void* thread_param) { printf("thread NO.%d start.\n", (int)pthread_self()); while (g_threadinfo.thread_running) { struct ctrip_task* current = NULL; pthread_mutex_lock(&g_threadinfo.mutex); while (g_threadinfo.tasknum <= 0) { //如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。 //当变化后,条件合适,将直接获得锁。 pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex); if (!g_threadinfo.thread_running) break; }// end inner-while-loop current = ctrip_thread_pool_retrieve_task(); pthread_mutex_unlock(&g_threadinfo.mutex); ctrip_thread_pool_do_task(current); }// end outer-while-loop printf("thread NO.%d exit.\n", (int)pthread_self()); } void ctrip_thread_pool_do_task(struct ctrip_task* t) { if (t == NULL) return; //TODO: do your work here printf("task value is [%d]\n", t->value); //TODO:如果t需要释放,记得在这里释放 } ok,不知道你有没有看明白呀?