qemu里面有个服务于aio的线程池:
struct ThreadPool {
AioContext *ctx;
QEMUBH *completion_bh;
QemuMutex lock;
QemuCond check_cancel;
QemuCond worker_stopped;
QemuSemaphore sem;
int max_threads;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
QLIST_HEAD(, ThreadPoolElement) head;
/* The following variables are protected by lock. */
QTAILQ_HEAD(, ThreadPoolElement) request_list;
int cur_threads;
int idle_threads;
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
int pending_cancellations; /* whether we need a cond_broadcast */
bool stopping;
};
qemu在thread_pool_init_one建池子的时候注册了一个延迟执行的work:
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
这个work将在spawn_thread里面被安排调度
static void spawn_thread(ThreadPool *pool)
{
pool->cur_threads++;
pool->new_threads++; //这里是为了记录下spawn thread被调用的次数,为后面安排的创建线程的后半部提供数据,因为不是每次调用进来都会安排后半部工作的
/* If there are threads being created, they will spawn new workers, so
* we don't spend time creating many threads in a loop holding a mutex or
* starving the current vcpu.
*
* If there are no idle threads, ask the main thread to create one, so we
* inherit the correct affinity instead of the vcpu affinity.
*/
if (!pool->pending_threads) { //如果前面已经有安排,就不再重复安排
qemu_bh_schedule(pool->new_thread_bh);
}
当work被调度到的时候,执行spawn_thread_bh_fn
static void spawn_thread_bh_fn(void *opaque)
{
ThreadPool *pool = opaque;
qemu_mutex_lock(&pool->lock);
do_spawn_thread(pool);
qemu_mutex_unlock(&pool->lock);
}
static void do_spawn_thread(ThreadPool *pool)
{
QemuThread t;
/* Runs with lock taken. */
if (!pool->new_threads) {
return;
}
pool->new_threads--;
pool->pending_threads++; //避免重复安排
qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
}
这里采用了递归的方式来一次性创建多个worker,新创建的worker线程会递归调动do_spawn_thread来创建下一个worker,直到发现new_threads为0
这里有个问题,如果qemu_thread_create创建新线程失败,那么就会导致后面新的线程永远无法创建,因为pending_threads不会被减扣为0,前面spawn_thread就不会再安排新的下半部工作来创建线程了。
下面是aio派发任务的函数thread_pool_submit_aio,可以看到,池子中的线程数是动态增加的,如果有空闲的线程或者线程数已达上限是不会创建新的线程的,并且采用了信号量通知的方法来减少线程轮询开销,有任务的时候才放开一个额度,而不是让线程一直尝试拿后面的mutex锁再去看下request list是不是空。
qemu_mutex_lock(&pool->lock);
if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
spawn_thread(pool);
}
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
qemu_mutex_unlock(&pool->lock);
qemu_sem_post(&pool->sem); //这里会增加sem的计数,让一个idle的worker获得执行机会
return &req->common;
worker创建的时候以及worker在func执行完后会再次尝试获取pool->sem(等待sem>0),等待新的任务:
pool->idle_threads++; qemu_mutex_unlock(&pool->lock); ret = qemu_sem_timedwait(&pool->sem, 10000);//减扣sem计数 qemu_mutex_lock(&pool->lock); pool->idle_threads--;
池子资源释放的时候,会标记pool->stopping并给所有worker一个最后的任务(通过sem_post):释放自己:
thread_pool_free:
/* Stop new threads from spawning */
qemu_bh_delete(pool->new_thread_bh);
pool->cur_threads -= pool->new_threads;
pool->new_threads = 0;
/* Wait for worker threads to terminate */
pool->stopping = true;
while (pool->cur_threads > 0) {
qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}