`

《Linux多线程服务端编程:使用muduo C++网络库》书摘6.6.2节

 
阅读更多

6.6.2 常见的并发网络服务程序设计方案

W. Richard Stevens 的《UNIX 网络编程(第2 版)》第27 章“Client-ServerDesign Alternatives”介绍了十来种当时(20 世纪90 年代末)流行的编写并发网络程序的方案。[UNP] 第3 版第30 章,内容未变,还是这几种。以下简称UNP CSDA方案。[UNP] 这本书主要讲解阻塞式网络编程,在非阻塞方面着墨不多,仅有一章。正确使用non-blocking IO 需要考虑的问题很多,不适宜直接调用Sockets API,而需要一个功能完善的网络库支撑。

随着2000 年前后第一次互联网浪潮的兴起,业界对高并发HTTP 服务器的强烈需求大大推动了这一领域的研究,目前高性能httpd 普遍采用的是单线程Reactor方式。另外一个说法是IBM Lotus 使用TCP 长连接协议,而把Lotus 服务端移植到Linux 的过程中IBM 的工程师们大大提高了Linux 内核在处理并发连接方面的可伸缩性,因为一个公司可能有上万人同时上线,连接到同一台跑着Lotus Server 的Linux 服务器。

可伸缩网络编程这个领域其实近十年来没什么新东西,POSA2 已经进行了相当全面的总结,另外以下几篇文章也值得参考。

• http://bulk.fefe.de/scalable-networking.pdf

• http://www.kegel.com/c10k.html

• http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

表6-1 是笔者总结的12 种常见方案。其中“互通”指的是如果开发chat 服务,多个客户连接之间是否能方便地交换数据(chat 也是附录A 中举的三大TCP 网络编程案例之一)。对于echo/httpd/Sudoku 这类“连接相互独立”的服务程序,这个功能无足轻重,但是对于chat 类服务却至关重要。“顺序性”指的是在httpd/Sudoku这类请求响应服务中,如果客户连接顺序发送多个请求,那么计算得到的多个响应是否按相同的顺序发还给客户(这里指的是在自然条件下,不含刻意同步)。

 

UNP CSDA 方案归入0 _ 5。方案5 也是目前用得很多的单线程Reactor 方案,muduo 对此提供了很好的支持。方案6 和方案7 其实不是实用的方案,只是作为过渡品。方案8 和方案9 是本文重点介绍的方案,其实这两个方案已经在§3.3 “多线程服务器的常用编程模型”中提到过,只不过当时没有用具体的代码示例来说明。

在对比各方案之前,我们先看看基本的micro benchmark 数据(前两项由Thread_bench.cc 测得,第三项由BlockingQueue_bench.cc 测得,硬件为E5320,内核Linux 2.6.32):

• fork()+exit(): 534.7μs。

• pthread_create()+pthread_join(): 42.5μs,其中创建线程用了26.1μs。

• push/pop a blocking queue : 11.5μs。

• Sudoku resolve: 100us (根据题目难度不同,浮动范围20_200μs)。

 

方案0 这其实不是并发服务器,而是iterative 服务器,因为它一次只能服务一个客户。代码见[UNP] 中的Figure 1.9,[UNP] 以此为对比其他方案的基准点。这个方案不适合长连接,倒是很适合daytime 这种write-only 短连接服务。以下Python代码展示用方案0 实现echo server 的大致做法(本章的Python 代码均没有考虑错误处理):

recipes/python/echo-iterative.py

3 import socket

4

5 def handle(client_socket, client_address):

6 while True:

7 data = client_socket.recv(4096)

8 if data:

9 sent = client_socket.send(data) # sendall?

10 else:

11 print "disconnect", client_address

12 client_socket.close()

13 break

14

15 if __name__ == "__main__":

16 listen_address = ("0.0.0.0", 2007)

17 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

18 server_socket.bind(listen_address)

19 server_socket.listen(5)

20

21 while True:

22 (client_socket, client_address) = server_socket.accept()

23 print "got connection from", client_address

24 handle(client_socket, client_address)

recipes/python/echo-iterative.py

L6~L13 是echo 服务的“业务逻辑循环”,从L21~L24 可以看出它一次只能服务一个客户连接。后面列举的方案都是在保持这个循环的功能不变的情况下,设法能高效地同时服务多个客户端。L9 代码值得商榷,或许应该用sendall() 函数,以确保完整地发回数据。

 

方案1 这是传统的Unix 并发网络编程方案,[UNP] 称之为child-per-client 或fork()-per-client,另外也俗称process-per-connection。这种方案适合并发连接数不大的情况。至今仍有一些网络服务程序用这种方式实现,比如PostgreSQL 和Perforce的服务端。这种方案适合“计算响应的工作量远大于fork() 的开销”这种情况,比如数据库服务器。这种方案适合长连接,但不太适合短连接,因为fork() 开销大于求解Sudoku 的用时。

Python 示例如下,注意其中L9~L16 正是前面的业务逻辑循环,self.request 代替了前面的client_socket。ForkingTCPServer 会对每个客户连接新建一个子进程,在子进程中调用EchoHandler.handle(),从而同时服务多个客户端。在这种编程方式中,业务逻辑已经初步从网络框架分离出来,但是仍然和IO 紧密结合。

recipes/python/echo-fork.py

1 #!/usr/bin/python

2

3 from SocketServer import BaseRequestHandler, TCPServer

4 from SocketServer import ForkingTCPServer, ThreadingTCPServer

5

6 class EchoHandler(BaseRequestHandler):

7 def handle(self):

8 print "got connection from", self.client_address

9 while True:

10 data = self.request.recv(4096)

11 if data:

12 sent = self.request.send(data) # sendall?

13 else:

14 print "disconnect", self.client_address

15 self.request.close()

16 break

17

18 if __name__ == "__main__":

19 listen_address = ("0.0.0.0", 2007)

20 server = ForkingTCPServer(listen_address, EchoHandler)

21 server.serve_forever()

recipes/python/echo-fork.py

 

方案2 这是传统的Java 网络编程方案thread-per-connection,在Java 1.4 引入NIO 之前,Java 网络服务多采用这种方案。它的初始化开销比方案1 要小很多,但与求解Sudoku 的用时差不多,仍然不适合短连接服务。这种方案的伸缩性受到线程数的限制,一两百个还行,几千个的话对操作系统的scheduler 恐怕是个不小的负担。

Python 示例如下,只改动了一行代码。ThreadingTCPServer 会对每个客户连接新建一个线程,在该线程中调用EchoHandler.handle()。

$ diff -U2 echo-fork.py echo-thread.py

if __name__ == "__main__":

listen_address = ("0.0.0.0", 2007)

- server = ForkingTCPServer(listen_address, EchoHandler)

+ server = ThreadingTCPServer(listen_address, EchoHandler)

server.serve_forever()

这里再次体现了将“并发策略”与业务逻辑(EchoHandler.handle())分离的思

路。用同样的思路重写方案0 的代码,可得到:

$ diff -U2 echo-fork.py echo-single.py

if __name__ == "__main__":

listen_address = ("0.0.0.0", 2007)

- server = ForkingTCPServer(listen_address, EchoHandler)

+ server = TCPServer(listen_address, EchoHandler)

server.serve_forever()

 

方案3 这是针对方案1 的优化,[UNP] 详细分析了几种变化,包括对accept(2)“惊群”问题(thundering herd)的考虑。

方案4 这是对方案2 的优化,[UNP] 详细分析了它的几种变化。方案3 和方案4 这两个方案都是Apache httpd 长期使用的方案。

以上几种方案都是阻塞式网络编程,程序流程(thread of control)通常阻塞在read() 上,等待数据到达。但是TCP 是个全双工协议,同时支持read() 和write()操作,当一个线程/进程阻塞在read() 上,但程序又想给这个TCP 连接发数据,那该怎么办?比如说echo client,既要从stdin 读,又要从网络读,当程序正在阻塞地读网络的时候,如何处理键盘输入?

又比如proxy,既要把连接a 收到的数据发给连接b,又要把从b 收到的数据发给a,那么到底读哪个?(proxy 是附录A 讲的三大TCP 网络编程案例之一。)

一种方法是用两个线程/进程,一个负责读,一个负责写。[UNP] 也在实现echoclient 时介绍了这种方案。§7.13 举了一个Python 双线程TCP relay 的例子,另外见Python Pinhole 的代码:http://code.activestate.com/recipes/114642/。

另一种方法是使用IO multiplexing,也就是select/poll/epoll/kqueue 这一系列的“多路选择器”,让一个thread of control 能处理多个连接。“IO 复用”其实复用的不是IO 连接,而是复用线程。使用select/poll 几乎肯定要配合non-blockingIO,而使用non-blocking IO 肯定要使用应用层buffer,原因见§7.4 。这就不是一件轻松的事儿了,如果每个程序都去搞一套自己的IO multiplexing 机制(本质是event-driven 事件驱动),这是一种很大的浪费。感谢Doug Schmidt 为我们总结出了Reactor 模式,让event-driven 网络编程有章可循。继而出现了一些通用的Reactor框架/库,比如libevent、muduo、Netty、twisted、POE 等等。有了这些库,我想基本不用去编写阻塞式的网络程序了(特殊情况除外,比如proxy 流量限制)。

这里先用一小段Python 代码简要地回顾“以IO multiplexing 方式实现并发echo server”的基本做法。为了简单起见,以下代码并没有开启non-blocking,也没有考虑数据发送不完整(L28)等情况。首先定义一个从文件描述符到socket 对象的映射(L14),程序的主体是一个事件循环(L15~L32),每当有IO 事件发生时,就针对不同的文件描述符(fileno)执行不同的操作(L16, L17)。对于listening fd,接受(accept)新连接,并注册到IO 事件关注列表(watch list),然后把连接添加到connections 字典中(L18~L23)。对于客户连接,则读取并回显数据,并处理连接的关闭(L24~L32)。对于echo 服务而言,真正的业务逻辑只有L28:将收到的数据原样发回客户端。

recipes/python/echo-poll.py

6 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

7 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

8 server_socket.bind(('', 2007))

9 server_socket.listen(5)

10 # server_socket.setblocking(0)

11 poll = select.poll() # epoll() should work the same

12 poll.register(server_socket.fileno(), select.POLLIN)

13

14 connections = {}

15 while True:

16 events = poll.poll(10000) # 10 seconds

17 for fileno, event in events:

18 if fileno == server_socket.fileno():

19 (client_socket, client_address) = server_socket.accept()

20 print "got connection from", client_address

21 # client_socket.setblocking(0)

22 poll.register(client_socket.fileno(), select.POLLIN)

23 connections[client_socket.fileno()] = client_socket

24 elif event & select.POLLIN:

25 client_socket = connections[fileno]

26 data = client_socket.recv(4096)

27 if data:

28 client_socket.send(data) # sendall() partial?

29 else:

30 poll.unregister(fileno)

31 client_socket.close()

32 del connections[fileno]

recipes/python/echo-poll.py

注意以上代码不是功能完善的IO multiplexing 范本,它没有考虑错误处理,也没有实现定时功能,而且只适合侦听(listen)一个端口的网络服务程序。如果需要侦听多个端口,或者要同时扮演客户端,那么代码的结构需要推倒重来。

这个代码骨架可用于实现多种TCP 服务器。例如写一个聊天服务只需改动3 行代码,如下所示。业务逻辑是L28~L30:将本连接收到的数据转发给其他客户连接。

$ diff echo-poll.py chat-poll.py -U4

--- echo-poll.py 2012-08-20 08:50:49.000000000 +0800

+++ chat-poll.py 2012-08-20 08:50:49.000000000 +0800

23 elif event & select.POLLIN:

24 clientsocket = connections[fileno]

25 data = clientsocket.recv(4096)

26 if data:

27 - clientsocket.send(data) # sendall() partial?

28 + for (fd, othersocket) in connections.iteritems():

29 + if othersocket != clientsocket:

30 + othersocket.send(data) # sendall() partial?

31 else:

32 poll.unregister(fileno)

33 clientsocket.close()

34 del connections[fileno]

但是这种把业务逻辑隐藏在一个大循环中的做法其实不利于将来功能的扩展,我们能不能设法把业务逻辑抽取出来,与网络基础代码分离呢?

Doug Schmidt 指出,其实网络编程中有很多是事务性(routine)的工作,可以提取为公用的框架或库,而用户只需要填上关键的业务逻辑代码,并将回调注册到框架中,就可以实现完整的网络服务,这正是Reactor 模式的主要思想。如果用传统Windows GUI 消息循环来做一个类比,那么我们前面展示IO multiplexing的做法相当于把程序的全部逻辑都放到了窗口过程(WndProc)的一个巨大的switch-case 语句中,这种做法无疑是不利于扩展的。(各种GUI 框架在此各显神通。)

1 LRESULT CALLBACK WndProc(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam)

2 {

3 switch (message)

4 {

5 case WM_DESTROY:

6 PostQuitMessage(0);

7 return 0;

8 // many more cases

9 }

10 return DefWindowProc (hwnd, message, wParam, lParam) ;

11 }

而Reactor 的意义在于将消息(IO 事件)分发到用户提供的处理函数,并保持网络部分的通用代码不变,独立于用户的业务逻辑。

单线程Reactor 的程序执行顺序如图6-11 (左图)所示。在没有事件的时候,线程等待在select/poll/epoll_wait 等函数上。事件到达后由网络库处理IO,再把消息通知(回调)客户端代码。Reactor 事件循环所在的线程通常叫IO 线程。通常由网络库负责读写socket,用户代码负载解码、计算、编码。

注意由于只有一个线程,因此事件是顺序处理的,一个线程同时只能做一件事情。在这种协作式多任务中,事件的优先级得不到保证,因为从“poll 返回之后”到“下一次调用poll 进入等待之前”这段时间内,线程不会被其他连接上的数据或事件抢占(见图6-11 的右图)。如果我们想要延迟计算(把compute() 推迟100ms),那么也不能用sleep() 之类的阻塞调用,而应该注册超时回调,以避免阻塞当前IO 线程。

 

 

方案5 基本的单线程Reactor 方案(见图6-11),即前面的server_basic.cc 程序。本文以它作为对比其他方案的基准点。这种方案的优点是由网络库搞定数据收发,程序只关心业务逻辑;缺点在前面已经谈了:适合IO 密集的应用,不太适合CPU 密集的应用,因为较难发挥多核的威力。另外,与方案2 相比,方案5 处理网络消息的延迟可能要略大一些,因为方案2 直接一次read(2) 系统调用就能拿到请求数据,而方案5 要先poll(2) 再read(2),多了一次系统调用。

这里用一小段Python 代码展示Reactor 模式的雏形。为了节省篇幅,这里直接使用了全局变量,也没有处理异常。程序的核心仍然是事件循环(L42~L46),与前面不同的是,事件的处理通过handlers 转发到各个函数中,不再集中在一坨。例如listening fd 的处理函数是handle_accept,它会注册客户连接的handler。普通客户连接的处理函数是handle_request,其中又把连接断开和数据到达这两个事件分开,后者由handle_input 处理。业务逻辑位于单独的handle_input 函数,实现了分离。

recipes/python/echo-reactor.py

6 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

7 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

8 server_socket.bind(('', 2007))

9 server_socket.listen(5)

10 # serversocket.setblocking(0)

11

12 poll = select.poll() # epoll() should work the same

13 connections = {}

14 handlers = {}

15

16 def handle_input(socket, data):

17 socket.send(data) # sendall() partial?

18

19 def handle_request(fileno, event):

20 if event & select.POLLIN:

21 client_socket = connections[fileno]

22 data = client_socket.recv(4096)

23 if data:

24 handle_input(client_socket, data)

25 else:

26 poll.unregister(fileno)

27 client_socket.close()

28 del connections[fileno]

29 del handlers[fileno]

30

31 def handle_accept(fileno, event):

32 (client_socket, client_address) = server_socket.accept()

33 print "got connection from", client_address

34 # client_socket.setblocking(0)

35 poll.register(client_socket.fileno(), select.POLLIN)

36 connections[client_socket.fileno()] = client_socket

37 handlers[client_socket.fileno()] = handle_request

38

39 poll.register(server_socket.fileno(), select.POLLIN)

40 handlers[server_socket.fileno()] = handle_accept

41

42 while True:

43 events = poll.poll(10000) # 10 seconds

44 for fileno, event in events:

45 handler = handlers[fileno]

46 handler(fileno, event)

recipes/python/echo-reactor.py

如果要改成聊天服务,重新定义handle_input 函数即可,程序的其余部分保持不变。

$ diff echo-reactor.py chat-reactor.py -U1

def handle_input(socket, data):

- socket.send(data) # sendall() partial?

+ for (fd, other_socket) in connections.iteritems():

+ if other_socket != socket:

+ other_socket.send(data) # sendall() partial?

必须说明的是,完善的非阻塞IO 网络库远比上面的玩具代码复杂,需要考虑各种错误场景。特别是要真正接管数据的收发,而不是像上面的示例那样直接在事件处理回调函数中发送网络数据。

注意在使用非阻塞IO +事件驱动方式编程的时候,一定要注意避免在事件回调中执行耗时的操作,包括阻塞IO 等,否则会影响程序的响应。这和Windows GUI消息循环非常类似。

 

方案6 这是一个过渡方案,收到Sudoku 请求之后,不在Reactor 线程计算,而是创建一个新线程去计算,以充分利用多核CPU。这是非常初级的多线程应用,因为它为每个请求(而不是每个连接)创建了一个新线程。这个开销可以用线程池来避免,即方案8。这个方案还有一个特点是out-of-order,即同时创建多个线程去计算同一个连接上收到的多个请求,那么算出结果的次序是不确定的,可能第2 个Sudoku 比较简单,比第1 个先算出结果。这也是我们在一开始设计协议的时候使用了id 的原因,以便客户端区分response 对应的是哪个request。

方案7 为了让返回结果的顺序确定,我们可以为每个连接创建一个计算线程,每个连接上的请求固定发给同一个线程去算,先到先得。这也是一个过渡方案,因为并发连接数受限于线程数目,这个方案或许还不如直接使用阻塞IO 的thread-per-connection 方案2。

方案7 与方案6 的另外一个区别是单个client 的最大CPU 占用率。在方案6 中,一个TCP 连接上发来的一长串突发请求(burst requests)可以占满全部8 个core;而在方案7 中,由于每个连接上的请求固定由同一个线程处理,那么它最多占用12.5%的CPU 资源。这两种方案各有优劣,取决于应用场景的需要(到底是公平性重要还是突发性能重要)。这个区别在方案8 和方案9 中同样存在,需要根据应用来取舍。

 

方案8 为了弥补方案6 中为每个请求创建线程的缺陷,我们使用固定大小线程池,程序结构如图6-12 所示。全部的IO 工作都在一个Reactor 线程完成,而计算任务交给thread pool。如果计算任务彼此独立,而且IO 的压力不大,那么这种方案是非常适用的。Sudoku Solver 正好符合。代码参见:examples/sudoku/server_threadpool.cc。

 

方案8 使用线程池的代码与单线程Reactor 的方案5 相比变化不大,只是把原来onMessage() 中涉及计算和发回响应的部分抽出来做成一个函数,然后交给ThreadPool 去计算。记住方案8 有乱序返回的可能,客户端要根据id 来匹配响应。

$ diff server_basic.cc server_threadpool.cc -u

--- server_basic.cc 2012-04-20 20:19:56.000000000 +0800

+++ server_threadpool.cc 2012-06-10 22:15:02.000000000 +0800

@@ -96,16 +100,7 @@ void onMessage(const TcpConnectionPtr& conn, ...

if (puzzle.size() == implicit_cast<size_t>(kCells))

{

- string result = solveSudoku(puzzle);

- if (id.empty())

- {

- conn->send(result+"\r\n");

- }

- else

- {

- conn->send(id+":"+result+"\r\n");

- }

+ threadPool_.run(boost::bind(&solve, conn, puzzle, id));

}

@@ -114,17 +109,40 @@

+ static void solve(const TcpConnectionPtr& conn,

+ const string& puzzle,

+ const string& id)

+ {

+ string result = solveSudoku(puzzle);

+ if (id.empty())

+ {

+ conn->send(result+"\r\n");

+ }

+ else

+ {

+ conn->send(id+":"+result+"\r\n");

+ }

+ }

+

EventLoop* loop_;

TcpServer server_;

+ ThreadPool threadPool_;

Timestamp startTime_;

};

线程池的另外一个作用是执行阻塞操作。比如有的数据库的客户端只提供同步访问,那么可以把数据库查询放到线程池中,可以避免阻塞IO 线程,不会影响其他客户连接,就像Java Servlet 2.x 的做法一样。另外也可以用线程池来调用一些阻塞的IO 函数,例如fsync(2)/fdatasync(2),这两个函数没有非阻塞的版本。

如果IO 的压力比较大,一个Reactor 处理不过来,可以试试方案9,它采用多个Reactor 来分担负载。

方案9 这是muduo 内置的多线程方案,也是Netty 内置的多线程方案。这种方案的特点是one loop per thread,有一个main Reactor 负责accept(2) 连接,然后把连接挂在某个sub Reactor 中(muduo 采用round-robin 的方式来选择sub Reactor),这样该连接的所有操作都在那个sub Reactor 所处的线程中完成。多个连接可能被分派到多个线程中,以充分利用CPU。

muduo 采用的是固定大小的Reactor pool,池子的大小通常根据CPU 数目确定,也就是说线程数是固定的,这样程序的总体处理能力不会随连接数增加而下降。另外,由于一个连接完全由一个线程管理,那么请求的顺序性有保证,突发请求也不会占满全部8 个核(如果需要优化突发请求,可以考虑方案11)。这种方案把IO 分派给多个线程,防止出现一个Reactor 的处理能力饱和。

与方案8 的线程池相比,方案9 减少了进出thread pool 的两次上下文切换,在把多个连接分散到多个Reactor 线程之后,小规模计算可以在当前IO 线程完成并发回结果,从而降低响应的延迟。我认为这是一个适应性很强的多线程IO 模型,因此把它作为muduo 的默认线程模型(见图6-13)。

 

方案9 代码见:examples/sudoku/server_multiloop.cc。它与server_basic.cc 的区别很小,最关键的只有一行代码:server_.setThreadNum(numThreads);

$ diff server_basic.cc server_multiloop.cc -up

--- server_basic.cc 2011-06-15 13:40:59.000000000 +0800

+++ server_multiloop.cc 2011-06-15 13:39:53.000000000 +0800

@@ -21,19 +21,22 @@ class SudokuServer

- SudokuServer(EventLoop* loop, const InetAddress& listenAddr)

+ SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)

: loop_(loop),

server_(loop, listenAddr, "SudokuServer"),

startTime_(Timestamp::now())

{

server_.setConnectionCallback(

boost::bind(&SudokuServer::onConnection, this, _1));

server_.setMessageCallback(

boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));

+ server_.setThreadNum(numThreads);

}

方案10 这是Nginx 的内置方案。如果连接之间无交互,这种方案也是很好的选择。工作进程之间相互独立,可以热升级。

方案11 把方案8 和方案9 混合,既使用多个Reactor 来处理IO,又使用线程池来处理计算。这种方案适合既有突发IO (利用多线程处理多个连接上的IO),又

有突发计算的应用(利用线程池把一个连接上的计算任务分配给多个线程去做),见图6-14。

 

这种方案看起来复杂,其实写起来很简单,只要把方案8 的代码加一行server_.setThreadNum(numThreads); 就行,这里就不举例了。

一个程序到底是使用一个event loop 还是使用多个event loops 呢?ZeroMQ 的手册给出的建议是,按照每千兆比特每秒的吞吐量配一个event loop 的比例来设置event loop 的数目,即muduo::TcpServer::setThreadNum() 的参数。依据这条经验规则,在编写运行于千兆以太网上的网络程序时,用一个event loop 就足以应付网络IO。如果程序本身没有多少计算量,而主要瓶颈在网络带宽,那么可以按这条规则来办,只用一个event loop。另一方面,如果程序的IO 带宽较小,计算量较大,而且对延迟不敏感,那么可以把计算放到thread pool 中,也可以只用一个event loop。

值得指出的是,以上假定了TCP 连接是同质的,没有优先级之分,我们看重的是服务程序的总吞吐量。但是如果TCP 连接有优先级之分,那么单个event loop 可能不适合,正确的做法是把高优先级的连接用单独的event loop 来处理。

在muduo 中,属于同一个event loop 的连接之间没有事件优先级的差别。我这么设计的原因是为了防止优先级反转。比方说一个服务程序有10 个心跳连接,有10 个数据请求连接,都归属同一个event loop,我们认为心跳连接有较高的优先级,心跳连接上的事件应该优先处理。但是由于事件循环的特性,如果数据请求连接上的数据先于心跳连接到达(早到1ms),那么这个event loop 就会调用相应的eventhandler 去处理数据请求,而在下一次epoll_wait() 的时候再来处理心跳事件。因此在同一个event loop 中区分连接的优先级并不能达到预想的效果。我们应该用单独的event loop 来管理心跳连接,这样就能避免数据连接上的事件阻塞了心跳事件,因为它们分属不同的线程。

 

结语

我在§3.3 曾写道:

总结起来, 我推荐的C++ 多线程服务端编程模式为:one loop per thread +thread pool。

• event loop 用作non-blocking IO 和定时器。

• thread pool 用来做计算,具体可以是任务队列或生产者消费者队列。

当时(2010 年2 月)写这篇博客时我还说:“以这种方式写服务器程序,需要一个优质的基于Reactor 模式的网络库来支撑,我只用过in-house 的产品,无从比较并推荐市面上常见的C++ 网络库,抱歉。”

现在有了muduo 网络库,我终于能够用具体的代码示例把自己的思想完整地表达出来了。归纳一下,实用的方案有5 种,muduo 直接支持后4 种,见表6-2。

 

表6-2 中的N 表示并发连接数目,C1 和C2 是与连接数无关、与CPU 数目有关的常数。

我再用银行柜台办理业务为比喻,简述各种模型的特点。银行有旋转门,办理业务的客户人员从旋转门进出(IO);银行也有柜台,客户在柜台办理业务(计算)。要想办理业务,客户要先通过旋转门进入银行;办理完之后,客户要再次通过旋转门离开银行。一个客户可以办理多次业务,每次都必须从旋转门进出(TCP 长连接)。另外,旋转门一次只允许一个客户通过(无论进出),因为read()/write() 只能同时调用其中一个。

方案5这间小银行有一个旋转门、一个柜台,每次只允许一名客户办理业务。而且当有人在办理业务时,旋转门是锁住的(计算和IO 在同一线程)。为了维持工作效率,银行要求客户应该尽快办理业务,最好不要在取款的时候打电话去问家里人密码,也不要在通过旋转门的时候停下来系鞋带,这都会阻塞其他堵在门外的客户。如果客户很少,这是很经济且高效的方案;但是如果场地较大(多核),则这种布局就浪费了不少资源,只能并发(concurrent)不能并行(parallel)。如果确实一次办不完,应该离开柜台,到门外等着,等银行通知再来继续办理(分阶段回调)。

方案8:这间银行有一个旋转门,一个或多个柜台。银行进门之后有一个队列,客户在这里排队到柜台(线程池)办理业务。即在单线程Reactor 后面接了一个线程池用于计算,可以利用多核。旋转门基本是不锁的,随时都可以进出。但是排队会消耗一点时间,相比之下,方案5 中客户一进门就能立刻办理业务。另外一种做法是线程池里的每个线程有自己的任务队列,而不是整个线程池共用一个任务队列。这样的好处是避免全局队列的锁争用,坏处是计算资源有可能分配不平均,降低并行度。

方案9这间大银行相当于包含方案5 中的多家小银行,每个客户进大门的时候就被固定分配到某一间小银行中,他的业务只能由这间小银行办理,他每次都要进出小银行的旋转门。但总体来看,大银行可以同时服务多个客户。这时同样要求办理业务时不能空等(阻塞),否则会影响分到同一间小银行的其他客户。而且必要的时候可以为VIP 客户单独开一间或几间小银行,优先办理VIP 业务。这跟方案5 不同,当普通客户在办理业务的时候,VIP 客户也只能在门外等着(见图6-11 的右图)。这是一种适应性很强的方案,也是muduo 原生的多线程IO 模型。

方案11这间大银行有多个旋转门,多个柜台。旋转门和柜台之间没有一一对应关系,客户进大门的时候就被固定分配到某一旋转门中(奇怪的安排,易于实现线程安全的IO,见§4.6),进入旋转门之后,有一个队列,客户在此排队到柜台办理业务。这种方案的资源利用率可能比方案9 更高,一个客户不会被同一小银行的其他客户阻塞,但延迟也比方案9 略大。

Linux 多线程服务端编程:使用muduo C++ 网络库(excerpt)

http://www.chenshuo.com/book/__

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics