libco源码学习(一)

协程,用一句话来描述就是用户态的轻量级线程。原有线程在调度上都是通过系统来完成的,因此每次切换阻塞都存在额外的开销。多线程在访问一些临界区资源时必须依靠锁等互斥机制。这也是多线程编程难的原因之一。而协程,则是将多协程之间的调度控制权交给了用户,协程拥有自己的寄存器上下文和栈。这使得在进行协程调度切换时,可以将寄存器上下文和栈保存到其他地方(where?),在切回来的时候,就可以恢复先前保存的寄存器上下文和栈,从而继续执行。多个协程其实仍是在单线程内执行,因此也不存在锁,系统调用,函数上下文切换等开销。

在并发编程中,线程是抢占式多任务,而协程则是协作式多任务。而由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心的处理同步问题,稍有不慎就会出错。而协程则完全不存在这个问题。

可是在协程实现的细节上有些问题引起了我的兴趣:

  • 可以知道,在函数调用过程中寄存器rbp,rsp(32位ebp,esp)等保存了栈底指针和栈顶指针,依靠这两个寄存器,程序可以在完成函数调用之后继续执行。那么在多个协程的情况下,如果协程自己的寄存器上下文和栈是独占还是共享的,那么在多协程情况下,寄存器上下文和栈的读写必然存在冲突,如何保证其正确性?
  • 从以上分析来看,似乎每个协程都有一个自己的独立的寄存器上下文和栈显得更为合适且在实现上难度相对来说会容易不少,那么问题又来了,这样的数据保存在哪里?这块空间的大小应该就直接决定了该协程库所支持的并发协程数。
  • 作为一个库来说,支持并发数如果上不去,必然没有优势。因此寄存器上下文和栈所占内存是否都是从堆上分配的?

libco是目前微信后台大规模使用的C/C++协程库。号称2013年至今稳定运行在微信后台的数万台机器上。因此学习libco是深入协程技术细节的很好切入点。

带着问题去学习比较有针对性,也便于加深理解。那么就一个一个问题来给心中的疑惑进行答疑吧^.^

  • 协程的寄存器上下文和栈是独占还是共享的?

答案是可有用户指定,通过stCoRoutineAttr结构体进行属性设置,其中有个stShareStack_t类型指针,当该指针非空时,表明使用共享栈空间。这部分空间由用户完成申请(libco提供了创建共享栈的函数stShareStack_t* co_alloc_sharestack(int iCount, int iStackSize)),指定栈队列大小等,后续构造出来的协程对象循环使用栈队列上空间,此时,会有若干的协程共享同一份栈空间。否则则有程序内进行单独的栈空间申请构造,供协程使用,此时每个协程独享自己的栈空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,
void *(*routine)(void*),void *arg );
//可以看出创建协程时,可以通过stCoRoutineAttr_t接口体进行属性设置
//然后我们来看下stCoRoutineAttr结构内部都有那些字段
struct stCoRoutineAttr_t
{
int stack_size;
stShareStack_t* share_stack;
stCoRoutineAttr_t()
{
stack_size = 128 * 1024;
share_stack = NULL;
}
}__attribute__ ((packed));
struct stStackMem_t
{
stCoRoutine_t* occupy_co;
int stack_size;
char* stack_bp; //stack_buffer + stack_size
char* stack_buffer;
};
struct stShareStack_t
{
unsigned int alloc_idx;
int stack_size;
int count;
stStackMem_t** stack_array;
};

接下来让我们研究一下大名鼎鼎的协程是何方神圣。话不多,上代码!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn;
void *arg;
coctx_t ctx;
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
void *pvEnv;
//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
//save stack buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;
stCoSpec_t aSpec[1024];
};
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;
stCoEpoll_t *pEpoll;
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
typedef void *(*pfn_co_routine_t)( void * );
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ];
#else
void *regs[ 14 ];
#endif
size_t ss_size;
char *ss_sp;
};
//光从结构体字段,名称及包含关系猜测,协程内部通过stack_mem保存了协程使用的栈空间,coctx_t保存了寄存器上下文

可以看到X86架构下有8个通用寄存器,X64则有16个寄存器,那么为什么这里针对64位环境下,仅开辟了14个空间用来拷贝寄存器,结合源码中coctx_swap.S中对寄存器的备份操作,可以发现64位下缺少了对%r10, %r11寄存器的备份。
那么为什么不需要对这两个寄存器来进行备份呢?这里我们需要先了解一下通用寄存器:

  • 寄存器通常被说成寄存器文件,其实就是CPU上的一块存储区域
  • X86-64中所有寄存器都是64位,相比32位的X86来说,标识符发生变化,比如:从原来的%ebp变成了%rbp,为了向后兼容,%ebp依然可以使用,不过指向了%rbp的低32位
  • X86-64寄存器的变化,不仅体现在位数上,更加体现在寄存器数量上,新增%r8-%r15。

x86-64的16个64位寄存器分别是:%rax, %rbx, %rcx, %rdx, %esi, %edi, %rbp, %rsp, %r8-%r15。其中:

  • %rax 作为函数返回值使用
  • %rsp栈指针寄存器,指向栈顶
  • %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第1参数,第2参数。。。
  • %rbx,%rbp,%r12,%r13,%14,%15 用作数据存储,遵循被调用者保护规则,简单说就是随便用,调用子函数之前要备份它,以防他被修改
  • %r10,%r11 用作数据存储,遵循调用者保护规则,简单说就是使用之前要先保存原值

咦,貌似出现了两个陌生的名词:调用者保护&被调用者保护。

  • 调用者保护:表示这些寄存器上存储的值,需要调用者(父函数)自己想办法先备份好,否则过会子函数直接使用这些寄存器将无情的覆盖。如何备份?当然是实现压栈(pushl),等子函数调用完成,再通过栈恢复(popl)
  • 被调用者保护:即表示需要由被调用者(子函数)想办法帮调用者(父函数)进行备份

插曲告一段落,还是深入实现看一下,那么先看下协程创建函数中做的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
static stCoRoutineEnv_t* g_arrCoEnvPerThread[ 204800 ] = { 0 };
void co_init_curr_thread_env()
{
pid_t pid = GetPid();
g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];
env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
self->cIsMain = 1;
env->pending_co = NULL;
env->occupy_co = NULL;
coctx_init( &self->ctx );
env->pCallStack[ env->iCallStackSize++ ] = self;
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
//co_init_curr_thread_env()初始化协程所在线程相关环境变量
  • 由此可以确定,通过同一线程创建出来的协程,都是运行在这个线程之上,因此可以保证了其串行执行,调度由用户完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{
stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
memset( lp,0,(long)(sizeof(stCoRoutine_t)));
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;
lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;
lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
//分析co_create_env逻辑,首先每个协程栈空间最大不超过8MB,且默认情况下为128KB
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
//同时上面的代码,对stack_size进行了取整处理,即将用户设置的值,向上取整为4096的整数倍。
//至于为什么要如此处理,待后续深入
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->occupy_co= NULL;
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size);
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
return stack_mem;
}
//这行代码很好的印证了问题二,三的猜测以及给出了肯定的答案。
//即协程所占用的内存均是从堆上分配的,这么一来可支持的协程数就会得到较好的保证

哈哈,看来自己思考的深度还是远远不够啊,想到的三个问题,光看了几个结构体和一个函数实现就得到了解答。在以后的博客中(计划还是分多篇博客进行介绍比较好,全集中在一块,难免容易疲劳),我将主要顺着libco接口使用方式的角度来分析一下内部逻辑,比如stCoRoutineEnv_t的作用使用共享栈的协程之间,如何保证栈信息的一致性和有效性协程内部上下文的切换