这一篇来分析libuv的四个特殊的持续请求(uv_handle_t族),uv_idle_t,uv_check_t,uv_prepare_t,uv_async_t。它们直接以强类型保存在uv_loop_t中,而不像其它的持续请求保存在handle_queue中,也是最简单的持续请求。
在实现中,libuv利用C的内存布局模拟了面向对象的继承关系,uv_handle_t是基类,其所有子类都必须以其为结构体中的第一个字段。先来看看基类的定义吧,在include/uv.h的402行:
1 #define UV_HANDLE_FIELDS \ 2 /* public */ \ 3 void* data; \ 4 /* read-only */ \ 5 uv_loop_t* loop; \ 6 uv_handle_type type; \ 7 /* private */ \ 8 uv_close_cb close_cb; \ 9 void* handle_queue[2]; \10 union { \11 int fd; \12 void* reserved[4]; \13 } u; \14 UV_HANDLE_PRIVATE_FIELDS
1 #define UV_HANDLE_PRIVATE_FIELDS \2 uv_handle_t* endgame_next; \3 unsigned int flags;
data:用户数据
loop:所属的事件循环
type:类型标识,通过内存布局和这个标识,简明的实现了继承,并不比C++语言级别的继承差,甚至更为灵活。
close_cb:关闭时的回调,对于某些子类有效。
handle_queue:是为了存储在uv_loop_t.handle_queue所需的链表节点。
flags:一些状态标记,后续会讲到。
endgame_next:持续请求关闭链表所需的节点信息,持续请求在关闭时并不是立即执行关闭收尾工作的,而是将其加入到uv_loop_t.endgame_handle链表里的.
好了,现在可以直奔主题了。
uv_idle_t,uv_check_t,uv_prepare_t
这三个持续请求都是一样的作用,在每次事件循环时触发一次事件,只是优先级不同,有何不同,看uv_run函数里的while那一块就知道了,在src/core.c的387-418行:
1 while (r != 0 && loop->stop_flag == 0) { 2 uv_update_time(loop); 3 uv_process_timers(loop); 4 5 ran_pending = uv_process_reqs(loop); 6 uv_idle_invoke(loop); 7 uv_prepare_invoke(loop); 8 9 timeout = 0;10 if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)11 timeout = uv_backend_timeout(loop);12 13 (*poll)(loop, timeout);14 uv_check_invoke(loop);15 uv_process_endgames(loop);16 17 if (mode == UV_RUN_ONCE) {18 /* UV_RUN_ONCE implies forward progress: at least one callback must have19 * been invoked when it returns. uv__io_poll() can return without doing20 * I/O (meaning: no callbacks) when its timeout expires - which means we21 * have pending timers that satisfy the forward progress constraint.22 *23 * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from24 * the check.25 */26 uv_process_timers(loop);27 }28 29 r = uv__loop_alive(loop);30 if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)31 break;32 }
uv_idle_invoke()是处理idle的,uv_check_invoke是处理check的,uv_prepare_invoke是处理prepare的,因此可以得出:
- idle在poll前触发事件(poll是io处理,以后会说)
- prepare在idle后触发事件
- check在poll后触发事件
这三类持续请求的实现在src/loop-watcher.c中,用了宏模拟了模板,其实现都是一样的,这里就只谈uv_idle_t了。
先看uv_idle_t的定义,在include/uv.h的750行:
struct uv_idle_s { UV_HANDLE_FIELDS UV_IDLE_PRIVATE_FIELDS};
HV_IDLE_PRIVATE_FIELDS在uv.h的551行:
1 #define UV_IDLE_PRIVATE_FIELDS \2 uv_idle_t* idle_prev; \3 uv_idle_t* idle_next; \4 uv_idle_cb idle_cb
先是基类的字段,然后是自己的字段,东西很少,只有一个回调,两个链表指针域,到这里可以知道,其是用链表存储的,头结点在uv_loop_t.idle_handle。check和prepare也是一样的结构。
先来回忆一下uv_idle_t怎么用,然后从用的流程探究其实现细节,是这样用的:
uv_idle_t idle;
uv_idle_init(uv_default_loop(),&idle);
uv_idle_start(&idle); //关闭可以用uv_idle_stop()
uv_run(uv_default_loop(),UV_RUN_DEFAULT);
libuv有个特色,所有的对象是init的,而不是create的,也就是内存都有外部申请,然后传给库做初始化,这样库本身就不管理内存了,从而使其更加通用。
init,start,stop三个函数定义在loop-watcher.c中,用宏定义的,来看看C的模板吧:
1 #define UV_LOOP_WATCHER_DEFINE(name, NAME) \ 2 int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \ 3 uv__handle_init(loop, (uv_handle_t*) handle, UV_##NAME); \ 4 \ 5 return 0; \ 6 } \ 7 \ 8 \ 9 int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \10 uv_loop_t* loop = handle->loop; \11 uv_##name##_t* old_head; \12 \13 assert(handle->type == UV_##NAME); \14 \15 if (uv__is_active(handle)) \16 return 0; \17 \18 if (cb == NULL) \19 return UV_EINVAL; \20 \21 old_head = loop->name##_handles; \22 \23 handle->name##_next = old_head; \24 handle->name##_prev = NULL; \25 \26 if (old_head) { \27 old_head->name##_prev = handle; \28 } \29 \30 loop->name##_handles = handle; \31 \32 handle->name##_cb = cb; \33 uv__handle_start(handle); \34 \35 return 0; \36 } \37 \38 \39 int uv_##name##_stop(uv_##name##_t* handle) { \40 uv_loop_t* loop = handle->loop; \41 \42 assert(handle->type == UV_##NAME); \43 \44 if (!uv__is_active(handle)) \45 return 0; \46 \47 /* Update loop head if needed */ \48 if (loop->name##_handles == handle) { \49 loop->name##_handles = handle->name##_next; \50 } \51 \52 /* Update the iterator-next pointer of needed */ \53 if (loop->next_##name##_handle == handle) { \54 loop->next_##name##_handle = handle->name##_next; \55 } \56 \57 if (handle->name##_prev) { \58 handle->name##_prev->name##_next = handle->name##_next; \59 } \60 if (handle->name##_next) { \61 handle->name##_next->name##_prev = handle->name##_prev; \62 } \63 \64 uv__handle_stop(handle); \65 \66 return 0; \67 } \68 \69 \70 void uv_##name##_invoke(uv_loop_t* loop) { \71 uv_##name##_t* handle; \72 \73 (loop)->next_##name##_handle = (loop)->name##_handles; \74 \75 while ((loop)->next_##name##_handle != NULL) { \76 handle = (loop)->next_##name##_handle; \77 (loop)->next_##name##_handle = handle->name##_next; \78 \79 handle->name##_cb(handle); \80 } \81 }82 83 UV_LOOP_WATCHER_DEFINE(prepare, PREPARE)84 UV_LOOP_WATCHER_DEFINE(check, CHECK)85 UV_LOOP_WATCHER_DEFINE(idle, IDLE)
这种模板虽不自动化,但是不是更明显的表达了意图,若把uv_##name##_t换做T,就更像了。
先来看uv_idle_init,这个函数很简单,仅调用了uv_handle_init,可以看做调用父类的构造函数,与C++的子类构造时自动调用父类的构造函数是一致的,很简单的一个理念,谁的成员谁来管。调这个函数,传入loop,自己的指针与一个UV_IDLE的东西,看看这个函数吧,它是宏,在src/uv-common.h的208行:
#define uv__handle_init(loop_, h, type_) \ do { \ (h)->loop = (loop_); \ (h)->type = (type_); \ (h)->flags = UV__HANDLE_REF; /* Ref the loop when active. */ \ QUEUE_INSERT_TAIL(&(loop_)->handle_queue, &(h)->handle_queue); \ uv__handle_platform_init(h); \ } \ while (0)
下面用base来表示uv_handle_t。UV_IDLE就是子类的标识,设置在base.type中,用来做父 is 子的判断,然后将对象加入uv_loop_t.handle_queue队列,UV_HANDLE_REF表示是否初始化过了。到这里,内存布局+type标识的手法模拟继承,已经十分完善灵活了。好了,uv_idle_init仅仅初始化了父类。这里其实是不严谨安全的,因为其他字段都没管,都是野值,应该要memset一下。
再来看uv_idle_start(),它做了两件事:
- 用头插法将自己加入uv_loopt_t.idle_handle
- 调用uv__handle_start()设置了一些标志,在base.flags里又加上了UV_HANDLE_ACTIVE,同时增加了uv_loop_t.active_handle计数.
ps:
13行就用了base.type做了is的判断.
到这里,我们见了base.flags的两个标识:UV_HANDLE_REF和UV_HANDLE_ACTIVE.前者大概表示是否初始化了基类,后者大概表示是否初始化了子类.
那uv_idle_t的事件是如何在事件循环里触发的呢?上面不是说了uv_run里调了uv_idle_invok函数么,就在这里触发的,这个函数也定义在上面的模板里,它的功能很简单,遍历uv_loop_t.idle_handle链表,调用每个节点的回调(在uv_idle_start()里设置的)。
最后再来看看uv_idle_stop(),它也很简单,将自己从链表里删掉,在调用uv__handle_stop重置了base.flags,递减了uv_loop_t.active_handle计数。在这里并没有删除uv_loop_t.handle_queue里的保存,当初看到这里的时候我还以为是泄露bug,还特意向libuv的邮件列表发了bug,然后官方回复我,这个保存是在uv_close里删除的。
那么就来看看uv_close吧,这是持续请求的退出流程,在src/handle.c的67行:
void uv_close(uv_handle_t* handle, uv_close_cb cb) { uv_loop_t* loop = handle->loop; if (handle->flags & UV__HANDLE_CLOSING) { assert(0); return; } handle->close_cb = cb; /* Handle-specific close actions */ switch (handle->type) { case UV_TCP: uv_tcp_close(loop, (uv_tcp_t*)handle); return; case UV_NAMED_PIPE: uv_pipe_close(loop, (uv_pipe_t*) handle); return; case UV_TTY: uv_tty_close((uv_tty_t*) handle); return; case UV_UDP: uv_udp_close(loop, (uv_udp_t*) handle); return; case UV_POLL: uv_poll_close(loop, (uv_poll_t*) handle); return; case UV_TIMER: uv_timer_stop((uv_timer_t*)handle); uv__handle_closing(handle); uv_want_endgame(loop, handle); return; case UV_PREPARE: uv_prepare_stop((uv_prepare_t*)handle); uv__handle_closing(handle); uv_want_endgame(loop, handle); return; case UV_CHECK: uv_check_stop((uv_check_t*)handle); uv__handle_closing(handle); uv_want_endgame(loop, handle); return; case UV_IDLE: uv_idle_stop((uv_idle_t*)handle); uv__handle_closing(handle); uv_want_endgame(loop, handle); return; case UV_ASYNC: uv_async_close(loop, (uv_async_t*) handle); return; case UV_SIGNAL: uv_signal_close(loop, (uv_signal_t*) handle); return; case UV_PROCESS: uv_process_close(loop, (uv_process_t*) handle); return; case UV_FS_EVENT: uv_fs_event_close(loop, (uv_fs_event_t*) handle); return; case UV_FS_POLL: uv__fs_poll_close((uv_fs_poll_t*) handle); uv__handle_closing(handle); uv_want_endgame(loop, handle); return; default: /* Not supported */ abort(); }}
利用base.type做判断,调用了不同的close函数,我们只需要看case UV_IDLE块即可,这块里调用了uv_idle_stop,因此也可以直接调用uv_close而不调stop,stop里利用了base.flags做了重复保护的。来看uv__handle__closing干了什么,在src/handle-inl.c的63行:
1 #define uv__handle_closing(handle) \ 2 do { \ 3 assert(!((handle)->flags & UV__HANDLE_CLOSING)); \ 4 \ 5 if (!(((handle)->flags & UV__HANDLE_ACTIVE) && \ 6 ((handle)->flags & UV__HANDLE_REF))) \ 7 uv__active_handle_add((uv_handle_t*) (handle)); \ 8 \ 9 (handle)->flags |= UV__HANDLE_CLOSING; \10 (handle)->flags &= ~UV__HANDLE_ACTIVE; \11 } while (0)
主要是设置base.flags为UV_HANDLE_CLOSING,第7行又把uv_loop_t.active_handle递增了,uv__handle_stop里做了递减,是一个冗余吧。
看最后的uv_want_endgame(),在src/handle-inl.c的88行:
1 INLINE static void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle) {2 if (!(handle->flags & UV_HANDLE_ENDGAME_QUEUED)) {3 handle->flags |= UV_HANDLE_ENDGAME_QUEUED;4 5 handle->endgame_next = loop->endgame_handles;6 loop->endgame_handles = handle;7 }
很简单,在base.flags里设置了UV_HANDLE_ENDGAME_QUEUED,然后将自己加入了uv_loop_t.endgame_handle链表。
在来看持续请求生命周期的最后一步,在uv_run()中的uv_process_endgame()里,在handle-inl.c的98行:
1 INLINE static void uv_process_endgames(uv_loop_t* loop) { 2 uv_handle_t* handle; 3 4 while (loop->endgame_handles) { 5 handle = loop->endgame_handles; 6 loop->endgame_handles = handle->endgame_next; 7 8 handle->flags &= ~UV_HANDLE_ENDGAME_QUEUED; 9 10 switch (handle->type) {11 case UV_TCP:12 uv_tcp_endgame(loop, (uv_tcp_t*) handle);13 break;14 15 case UV_NAMED_PIPE:16 uv_pipe_endgame(loop, (uv_pipe_t*) handle);17 break;18 19 case UV_TTY:20 uv_tty_endgame(loop, (uv_tty_t*) handle);21 break;22 23 case UV_UDP:24 uv_udp_endgame(loop, (uv_udp_t*) handle);25 break;26 27 case UV_POLL:28 uv_poll_endgame(loop, (uv_poll_t*) handle);29 break;30 31 case UV_TIMER:32 uv_timer_endgame(loop, (uv_timer_t*) handle);33 break;34 35 case UV_PREPARE:36 case UV_CHECK:37 case UV_IDLE:38 uv_loop_watcher_endgame(loop, handle);39 break;40 41 case UV_ASYNC:42 uv_async_endgame(loop, (uv_async_t*) handle);43 break;44 45 case UV_SIGNAL:46 uv_signal_endgame(loop, (uv_signal_t*) handle);47 break;48 49 case UV_PROCESS:50 uv_process_endgame(loop, (uv_process_t*) handle);51 break;52 53 case UV_FS_EVENT:54 uv_fs_event_endgame(loop, (uv_fs_event_t*) handle);55 break;56 57 case UV_FS_POLL:58 uv__fs_poll_endgame(loop, (uv_fs_poll_t*) handle);59 break;60 61 default:62 assert(0);63 break;64 }65 }66 }
与uv_close一样,做了is判断后,调用子模块的相关函数,只需看case UV_IDLE块,里面调用了uv_loop_watcher_endgame(),这个函数调用了uv__handle_close(),这个函数在handle-ini.c的76行:
1 #define uv__handle_close(handle) \ 2 do { \ 3 QUEUE_REMOVE(&(handle)->handle_queue); \ 4 uv__active_handle_rm((uv_handle_t*) (handle)); \ 5 \ 6 (handle)->flags |= UV_HANDLE_CLOSED; \ 7 \ 8 if ((handle)->close_cb) \ 9 (handle)->close_cb((uv_handle_t*) (handle)); \10 } while (0
很简单,在base.flags里加上UV_HANDLE_CLOSED,调用结束回调,将其从uv_loop_t.handle_queue里删除。
好了,到这里,我们分析了一个持续请求的整个生命周期,这个周期可以是用base.flags来标记的,这里总结一下:
UV_HANDLE_REF -> UV_HANDLE_ACTIVE -> UV_HANDLE_CLOSING -> UV_HANDLE_ENDGAME_QUEUED -> UV_HANDLE_CLOSED
正好对应了handle的几个方法:
uv__handle_init() -> uv__handle_start() -> uv__handle_stop() - > uv__handle_closing() -> uv_want_endgame()-> uv__handle_close()
其实uv_close和uv_want_endgame这两个方法还可以用uv_handle_t里加一个函数指针(表示虚表),子类在初始化时改写它,从而模拟一个多态来简化switch判断。
uv_async_t
这个持续请求用来做不同事件循环中(线程间通信)通信的,唯一一个有线程安全的函数的模块,其他子模块也使用了它,比较重要。来看看它的定义吧,在include/uv.h的760行:
struct uv_async_s { UV_HANDLE_FIELDS UV_ASYNC_PRIVATE_FIELDS};
UV_ASYNC_PRIVATE_FIELDS在src/uv-win.h的535行:
1 #define UV_ASYNC_PRIVATE_FIELDS \2 struct uv_req_s async_req; \3 uv_async_cb async_cb; \4 /* char to avoid alignment issues */ \5 char volatile async_sent;
非常简单,两个基类,一个回调,async_send可以看做锁,是uv_async_send的关键。
这个持续请求既属于uv_handle_t,又属于uv_req_t.那就先来看uv_req_t的定义吧,在include/uv.h的381行:
1 /* Abstract base class of all requests. */2 struct uv_req_s {3 UV_REQ_FIELDS4 }
UV_REQ_FIELDS在370行:
#define UV_REQ_FIELDS \ /* public */ \ void* data; \ /* read-only */ \ uv_req_type type; \ /* private */ \ void* active_queue[2]; \ void* reserved[4]; \ UV_REQ_PRIVATE_FIELDS
UV_REQ_PRIVATE_FIELDS在src/uv-win.h的359行:
#define UV_REQ_PRIVATE_FIELDS \ union { \ /* Used by I/O operations */ \ struct { \ OVERLAPPED overlapped; \ size_t queued_bytes; \ } io; \ } u; \ struct uv_req_s* next_req
data:用户数据域。
type:子类标识。
active_queue:队列指针域,为了保存在uv_loop_t.active_queue所必须的。
io:投递iocp请求所必须的重叠结构。
好了,一样,先回忆下它是如何用的:
uv_async_t asy;
uv_async_init(uv_default_loop(),&asy,on_async);
uv_run(uv_default_loop(),UV_RUN_DEFAULT);
然后,外部用uv_async_send()这个函数就可以触发一次事件,就好像唤醒一个线程一样。这个函数是线程安全的,同时也有一个特性:对同一个请求对象,多次调用时只保证至少触发一次,就好像windows的消息循环将同类消息合并了一样,具体细节后面分析时再说。
先看uv_async_init干了什么,在src/async.c的40行:
1 int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { 2 uv_req_t* req; 3 4 uv__handle_init(loop, (uv_handle_t*) handle, UV_ASYNC); 5 handle->async_sent = 0; 6 handle->async_cb = async_cb; 7 8 req = &handle->async_req; 9 uv_req_init(loop, req);10 req->type = UV_WAKEUP;11 req->data = handle;12 13 uv__handle_start(handle);14 15 return 0;
主要是初始化两个基类,持续请求用UV_ASYNC标识自己,进入UV_HANDLE_ACTIVE的阶段。一次性请求用UV_WAKEUP标识自己,它的一些生命周期后面再说。
到这里,什么也看不出,看uv_async_send吧,在src/async.c的67行:
1 int uv_async_send(uv_async_t* handle) { 2 uv_loop_t* loop = handle->loop; 3 4 if (handle->type != UV_ASYNC) { 5 /* Can't set errno because that's not thread-safe. */ 6 return -1; 7 } 8 9 /* The user should make sure never to call uv_async_send to a closing */10 /* or closed handle. */11 assert(!(handle->flags & UV__HANDLE_CLOSING));12 13 if (!uv__atomic_exchange_set(&handle->async_sent)) {14 POST_COMPLETION_FOR_REQ(loop, &handle->async_req);15 }16 17 return 0;
关键在于13-15行,从名字上看,用cas原语做了一次锁判断,持有锁的发送一个请求。直接来看看uv_atomic_exchange_set到底是啥吧,在src/atomicops-inl.h的37行:
1 static char __declspec(inline) uv__atomic_exchange_set(char volatile* target) {2 return _InterlockedOr8(target, 1);
果然是的,_InterlockedOr8是一个逻辑或原语,返回旧值。因此现在对uv_async_send可以这样理解了:它是线程安全的,发送请求会置位async_send,再async_send没有被复位之前,后续的请求都会被忽略。
看看POST_COMPLETION_FOR_REQ干了什么,在src/req-inl.h的73行:
1 #define POST_COMPLETION_FOR_REQ(loop, req) \2 if (!PostQueuedCompletionStatus((loop)->iocp, \3 0, \4 0, \5 &((req)->u.io.overlapped))) { \6 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); \7
哦,用uv_req_t的重叠结构发了一个iocp请求,看来最终事件的触发应该和GetCompletionStatus有关了。这里简单的说一下iocp,可以把它理解为有两个队列的东西,一个请求队列,一个通知队列,外部可以向其投递请求,告诉它我要做某事,这些请求就放在请求队列中,操作系统是马仔,它会用另外的线程去处理请求,每处理一个就往通知队列中放一个处理结果信息。使用者只需要交待事情,得知结果。
我们直接进屋去看看犹抱琵琶半遮面的uv_async_t,iocp结果的处理在事件循环的poll函数中,去看看吧,在uv_run函数中,src/core.c的372行:
1 void (*poll)(uv_loop_t* loop, DWORD timeout);2 3 if (pGetQueuedCompletionStatusEx)4 poll = &uv_poll_ex;5 else6 poll = &uv_poll
候选两个版本的函数,优先选扩展版的,真是注意性能呀,我们看最好的uv_poll_ex,在305行:
1 static void uv_poll_ex(uv_loop_t* loop, DWORD timeout) { 2 BOOL success; 3 uv_req_t* req; 4 OVERLAPPED_ENTRY overlappeds[128]; 5 ULONG count; 6 ULONG i; 7 int repeat; 8 uint64_t timeout_time; 9 10 timeout_time = loop->time + timeout;11 12 for (repeat = 0; ; repeat++) {13 success = pGetQueuedCompletionStatusEx(loop->iocp,14 overlappeds,15 ARRAY_SIZE(overlappeds),16 &count,17 timeout,18 FALSE);19 20 if (success) {21 for (i = 0; i < count; i++) {22 /* Package was dequeued */23 req = uv_overlapped_to_req(overlappeds[i].lpOverlapped);24 uv_insert_pending_req(loop, req);25 }26 27 /* Some time might have passed waiting for I/O,28 * so update the loop time here.29 */30 uv_update_time(loop);31 } else if (GetLastError() != WAIT_TIMEOUT) {32 /* Serious error */33 uv_fatal_error(GetLastError(), "GetQueuedCompletionStatusEx");34 } else if (timeout > 0) {35 /* GetQueuedCompletionStatus can occasionally return a little early.36 * Make sure that the desired timeout target time is reached.37 */38 uv_update_time(loop);39 if (timeout_time > loop->time) {40 timeout = (DWORD)(timeout_time - loop->time);41 /* The first call to GetQueuedCompletionStatus should return very42 * close to the target time and the second should reach it, but43 * this is not stated in the documentation. To make sure a busy44 * loop cannot happen, the timeout is increased exponentially45 * starting on the third round.46 */47 timeout += repeat ? (1 << (repeat - 1)) : 0;48 continue;49 }50 }51 break;52 }53 }
很简单,一部分是利用GetQueueCompletionStatusEx收取iocp的结果信息,一部分是计时器的时间处理。GetQueueCompletionStatusEx和GetQueueCompletionStatus的区别在于,前者可以一次收取多个结果通知,后者一次只能收取一个结果通知。在这只需要看收取通知后处理的部分,其他的在分析timer时再说,看上面的21-25行即可。
这几行代码做了两件事:将重叠结构指针转为一次性请求对象,将一次性请求对象加入uv_loop_t.pending_reqs_tail。
uv_overlapped_to_req()不得不说,这是C语言的一个奇淫技巧,根据结构体的某个成员指针计算出结构体指针,非常有用,用于一些数据结构实现中,可以节省一个数据域指针的开销。最初见与linux内核的container_of宏,实现是这样的:
1 #define CONTAINING_RECORD(address, type, field) ((type *)( \2 (PCHAR)(address) - \3 (ULONG_PTR)(&((type *)0)->field))
因为成员地址是大于结构体首地址的,所以只需将成员地址-成员的偏移址即可得,那偏移址如何计算,用0强转为一个同类型结构,然后取对应成员地址。
uv_loop_t.pending_req_tail是一个循环单链表,uv_req_t.next_req就是指针域。uv_insert_pending_req()用尾插法将获取的结果通知对应的请求插入这个链表中。那么事件的触发应该就在消费这个链表的地方,在那呢,在uv_run()中调用uv_process_reqs消费的,这个方法在src/req-inl.c的144行:
1 INLINE static int uv_process_reqs(uv_loop_t* loop) { 2 uv_req_t* req; 3 uv_req_t* first; 4 uv_req_t* next; 5 6 if (loop->pending_reqs_tail == NULL) 7 return 0; 8 9 first = loop->pending_reqs_tail->next_req;10 next = first;11 loop->pending_reqs_tail = NULL;12 13 while (next != NULL) {14 req = next;15 next = req->next_req != first ? req->next_req : NULL;16 17 switch (req->type) {18 case UV_READ:19 DELEGATE_STREAM_REQ(loop, req, read, data);20 break;21 22 case UV_WRITE:23 DELEGATE_STREAM_REQ(loop, (uv_write_t*) req, write, handle);24 break;25 26 case UV_ACCEPT:27 DELEGATE_STREAM_REQ(loop, req, accept, data);28 break;29 30 case UV_CONNECT:31 DELEGATE_STREAM_REQ(loop, (uv_connect_t*) req, connect, handle);32 break;33 34 case UV_SHUTDOWN:35 /* Tcp shutdown requests don't come here. */36 assert(((uv_shutdown_t*) req)->handle->type == UV_NAMED_PIPE);37 uv_process_pipe_shutdown_req(38 loop,39 (uv_pipe_t*) ((uv_shutdown_t*) req)->handle,40 (uv_shutdown_t*) req);41 break;42 43 case UV_UDP_RECV:44 uv_process_udp_recv_req(loop, (uv_udp_t*) req->data, req);45 break;46 47 case UV_UDP_SEND:48 uv_process_udp_send_req(loop,49 ((uv_udp_send_t*) req)->handle,50 (uv_udp_send_t*) req);51 break;52 53 case UV_WAKEUP:54 uv_process_async_wakeup_req(loop, (uv_async_t*) req->data, req);55 break;56 57 case UV_SIGNAL_REQ:58 uv_process_signal_req(loop, (uv_signal_t*) req->data, req);59 break;60 61 case UV_POLL_REQ:62 uv_process_poll_req(loop, (uv_poll_t*) req->data, req);63 break;64 65 case UV_PROCESS_EXIT:66 uv_process_proc_exit(loop, (uv_process_t*) req->data);67 break;68 69 case UV_FS_EVENT_REQ:70 uv_process_fs_event_req(loop, req, (uv_fs_event_t*) req->data);71 break;72 73 default:74 assert(0);75 }76 }77 78 return 1;79 }
遍历链表,根据每个一次性请求的type调用不同子系统方法,uv_async_t的type为UV_WAKEUP,所以只需要看uv_process_async_wakeup_req(),它在src/async.c的87行:
1 void uv_process_async_wakeup_req(uv_loop_t* loop, uv_async_t* handle, 2 uv_req_t* req) { 3 assert(handle->type == UV_ASYNC); 4 assert(req->type == UV_WAKEUP); 5 6 handle->async_sent = 0; 7 8 if (handle->flags & UV__HANDLE_CLOSING) { 9 uv_want_endgame(loop, (uv_handle_t*)handle);10 } else if (handle->async_cb != NULL) {11 handle->async_cb(handle);12 }
这里是调用回调,复位async_send。到这里,就对uv_async_t的机制了然于胸了。uv_async_send可以下个最终定义了:如果当前对象没有被占用,则占用它并触发一次事件,否则被忽略。这个方法是线程安全的。什么是占有,成功调用uv_async_send直到回调被调用这个时间段。
最后来看看它是如何销毁的,与持续性请求一样,进入UV_HANDLE_ACTICE之后的流程(closing,然后加入endgame_handles),可以调用uv_async_close或者uv_close.
好了,到这里,四大特殊持续请求就分析完了,也对一次性请求有了一个初步认识,知道它是先放入uv_loop_t.pending_reqs_tail链表中,然后在uv_process_reqs来处理的。
未完待续,下一篇分析tcp。