qemu里面有个服务于aio的线程池:
struct ThreadPool { AioContext *ctx; QEMUBH *completion_bh; QemuMutex lock; QemuCond check_cancel; QemuCond worker_stopped; QemuSemaphore sem; int max_threads; QEMUBH *new_thread_bh; /* The following variables are only accessed from one AioContext. */ QLIST_HEAD(, ThreadPoolElement) head; /* The following variables are protected by lock. */ QTAILQ_HEAD(, ThreadPoolElement) request_list; int cur_threads; int idle_threads; int new_threads; /* backlog of threads we need to create */ int pending_threads; /* threads created but not running yet */ int pending_cancellations; /* whether we need a cond_broadcast */ bool stopping; };
qemu在thread_pool_init_one建池子的时候注册了一个延迟执行的work:
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
这个work将在spawn_thread里面被安排调度
static void spawn_thread(ThreadPool *pool) { pool->cur_threads++; pool->new_threads++; //这里是为了记录下spawn thread被调用的次数,为后面安排的创建线程的后半部提供数据,因为不是每次调用进来都会安排后半部工作的 /* If there are threads being created, they will spawn new workers, so * we don't spend time creating many threads in a loop holding a mutex or * starving the current vcpu. * * If there are no idle threads, ask the main thread to create one, so we * inherit the correct affinity instead of the vcpu affinity. */ if (!pool->pending_threads) { //如果前面已经有安排,就不再重复安排 qemu_bh_schedule(pool->new_thread_bh); }
当work被调度到的时候,执行spawn_thread_bh_fn
static void spawn_thread_bh_fn(void *opaque) { ThreadPool *pool = opaque; qemu_mutex_lock(&pool->lock); do_spawn_thread(pool); qemu_mutex_unlock(&pool->lock); }
static void do_spawn_thread(ThreadPool *pool) { QemuThread t; /* Runs with lock taken. */ if (!pool->new_threads) { return; } pool->new_threads--; pool->pending_threads++; //避免重复安排 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); }
这里采用了递归的方式来一次性创建多个worker,新创建的worker线程会递归调动do_spawn_thread来创建下一个worker,直到发现new_threads为0
这里有个问题,如果qemu_thread_create创建新线程失败,那么就会导致后面新的线程永远无法创建,因为pending_threads不会被减扣为0,前面spawn_thread就不会再安排新的下半部工作来创建线程了。
下面是aio派发任务的函数thread_pool_submit_aio,可以看到,池子中的线程数是动态增加的,如果有空闲的线程或者线程数已达上限是不会创建新的线程的,并且采用了信号量通知的方法来减少线程轮询开销,有任务的时候才放开一个额度,而不是让线程一直尝试拿后面的mutex锁再去看下request list是不是空。
qemu_mutex_lock(&pool->lock); if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { spawn_thread(pool); } QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); qemu_mutex_unlock(&pool->lock); qemu_sem_post(&pool->sem); //这里会增加sem的计数,让一个idle的worker获得执行机会 return &req->common;
worker创建的时候以及worker在func执行完后会再次尝试获取pool->sem(等待sem>0),等待新的任务:
pool->idle_threads++; qemu_mutex_unlock(&pool->lock); ret = qemu_sem_timedwait(&pool->sem, 10000);//减扣sem计数 qemu_mutex_lock(&pool->lock); pool->idle_threads--;
池子资源释放的时候,会标记pool->stopping并给所有worker一个最后的任务(通过sem_post):释放自己:
thread_pool_free: /* Stop new threads from spawning */ qemu_bh_delete(pool->new_thread_bh); pool->cur_threads -= pool->new_threads; pool->new_threads = 0; /* Wait for worker threads to terminate */ pool->stopping = true; while (pool->cur_threads > 0) { qemu_sem_post(&pool->sem); qemu_cond_wait(&pool->worker_stopped, &pool->lock); }