`

教你开发一个高效的服务器

阅读更多

教你开发一个高效的服务器

转载请注明作者信息
作者:lijiuwei
邮箱:lijiuwei0902@gmail.com

高效服务器的关键技术:
1.需要有固定数量的线程来去处理请求,而不可以每次收到请求都fork或者create_thread;
2.如果socket使用堵塞模式,那么read必须要有超时,如果使用非堵塞模式,那么read就必须要有数据缓冲;
3.使用epoll分别在不同的线程中监视可读和可写;
4.对于tcp长连接使用主线程分配连接,然后由任务线程处理,一个任务线程对应一个任务队列;对于tcp短连接和udp连接,使用竞争线程;
5.时刻检测并且关闭超时连接;
6.真正关闭socket的地方只能由一处,其他地方发现其socket需要关闭只能标识socket应该关闭了,然后由那一处安全的关闭socket;
7.当服务器需要不断的发送硬盘上的数据给客户端(比如说文件传输服务器和流媒体服务器),那么服务器端必须先从硬盘加载数据到数据缓存,再从数据缓冲发送数据给客户端;
8.内存分配必须成功,不然线程睡眠然后重新分配直到分配成功为止;

通用设计方案:
现在我们设计一个通用的服务器,这个服务器接受客户端的连接,然后不断读取客户端的请求,在某一时刻又转为不断的发送数据给客户端;下面我们看看怎么设计这样的一个服务器

注:
1.为了只关注设计方面并且让代码尽量简洁,对于一些错误处理我删除了。
2.must_malloc,must_calloc和其他前缀为must的函数封装了相应的内存分配函数,使得分配必须成功;
3.queue_t结构是一个通用的任务队列结构,queue_new是创建新的队列,queue_push是推数据到队列中,queue_pop是从队列中抛出数据
4.下面的代码只是一个良好的服务器架构,而不是一个真正能使用的服务器
5.欢迎讨论和优化!

 

typedef struct server_st *server_t;
typedef struct task_queue_st *task_queue_t;
typedef struct conn_st *conn_t;

/** 服务器结构 */
struct server_st {
	/** 监听地址 */
	struct in_addr listen_addr;

	/** 监听端口 */
	int port;

	/** 监听描述符 */
	int listen_fd;

	/**负责读取请求的任务队列,一个线程负责处理一个任务队列,一共五个*/
	queue_t readreq_task_queues[5];

	/**负责写数据的任务队列,一个线程负责处理一个任务队列,一共五个*/
	queue_t writedata_task_queues[5];
};

/** 任务队列结构 */
struct task_queue_st {
	/** 待添加的连接 */
	queue_t pending_conn_queue;

	/** 互斥锁,用于多线程环境下对任务队列的访问控制 */
	pthread_mutex_t task_queue_mutex;

	/** 链表结构,保存任务队列中的所有连接 */
	conn_t conn_head;

	/** 链表的最后一个节点,用于方便在结尾插入新的节点 */
	conn_t conn_tail;

	/** 总连接数*/
	int total_conns;
};

/** 用户的阶段状态 */
typedef enum {
	status_ESTABLISHED,/** 客户端刚建立连接 */
	status_CONNECTED,/** 客户端正在连接 */
	status_START_RECV_DATA,/** 客户端开始接收数据,即表示服务器端开始发送数据 */
	status_CLOSED/** 客户端关闭连接 */
} client_status_t;

/** 服务器的阶段状态 */
typedef enum {
	status_SERVER_CONNECTED,/** 与客户端建立连接 */
	status_SERVER_CLOSING /** 服务器关闭连接 */
} server_status_t;

/** 连接结构 */
struct conn_st {
	/** 该连接的所属服务器 */
	server_t server;

	/** 该连接的引用计数 */
	int ref_count;

	/** 用户最后一次活动的时间*/
	time_t last_activity;

	/** 客户端当前的状态阶段*/
	client_status_t client_status;

	/** 服务器当前的状态阶段*/
	server_status_t server_status;

	/** socket描述符*/
	int fd;

	/** 数据缓冲,当服务器端需要不断的发送数据给客户端时被用到 */
	char *remain_data;

	/** 数据缓冲剩余数据大小 */
	int remain_data_size;

	/** 下一个连接 */
	conn_t readreq_task_queue_next;

	/** 下一个连接 */
	conn_t writedata_task_queue_next;
};

/** 主函数 */
int main(int argc, char **argv) {
	int idx, conn_fd;
	server_t server;

	server = must_calloc(1, sizeof(struct server_st));
	server->port = 端口号;
	server->listen_addrs.s_addr = INADDR_ANY;
	server->listen_fd = 服务器监听(&server->listen_addr,server->port);
	设置成非堵塞(server->listen_fd);

	/* 创建5个负责读取请求的任务线程 */
	for (idx = 0; idx < 5; idx++) {
		task_queue_t task_queue = server->readreq_task_queues[idx];

		bzero(task_queue,sizeof(struct task_queue_st));
		task_queue->pending_conn_queue = queue_new();
		pthread_mutex_init(&task_queue->task_queue_mutex,NULL);

		pthread_create(&task_queue->thread_tid, NULL, readreq_task_thread, task_queue);
	}

	/* 创建5个负责写数据的任务线程 */
	for (idx = 0; idx < 5; idx++) {
		task_queue_t task_queue = server->writedata_task_queues[idx];

		bzero(task_queue,sizeof(struct task_queue_st));
		task_queue->pending_conn_queue = queue_new();
		pthread_mutex_init(&task_queue->task_queue_mutex,NULL);

		pthread_create(&task_queue->thread_tid, NULL, writedata_task_thread, task_queue);
	}

	/* 无限循环的监听,等待客户端的连接请求 */
	while (1) {
		conn_fd = accept(server->listen_fd, 0, 0);
		if (conn_fd < 0) continue;

		/* 和一个客户端建立了连接,下面开始把该连接分配给负责读取请求的任务线程*/
		assign_to_readreq_task_thread(server,conn_fd);
	}

	return 0;
}


/*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/
static void assign_to_readreq_task_thread(server_t server, int conn_fd) {
	int idx;
	task_queue_t task_queue, min_task_queue = NULL;
	conn_t conn;

	/* 查找连接数最少的任务队列 */
	for (idx = 0; idx < 5; idx++) {
		task_queue = server->readreq_task_queues[idx];

		if (min_task_queue == NULL)
			min_task_queue = task_queue;
		else if (min_task_queue->total_conns > task_queue->total_conns)
			min_task_queue = task_queue;
	}

	if (min_task_queue) {
		conn = must_calloc(1, sizeof(struct conn_st));

		conn->server = server;
		conn->client_status = status_ESTABLISHED;
		conn->server_status = status_SERVER_CONNECTED;
		conn->ref_count++;/*累加引用计数*/

		/*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/
		pthread_mutex_lock(&min_task_queue->task_queue_mutex);
		queue_push(min_task_queue->pending_conn_queue, conn, 1);
		min_task_queue->total_conns++;
		pthread_mutex_unlock(&min_task_queue->task_queue_mutex);
	}
}

/**
*负责读取请求的任务线程处理主函数,负责管理,处理自己队列中的所有连接
*/
static void *readreq_task_thread(void *arg) {
	int epfd = epoll_create(20480);
	struct epoll_event ev,happened_ev[128];
	task_queue_t task_queue = (task_queue_t) arg;
	int ready,i;

	while (1) {
		time_t curtime = time(NULL);
		/*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/
		conn_t conn = NULL, prev_conn = NULL, discarded_conn;

		/*把待添加队列里的连接添加到任务队列中*/
		pthread_mutex_lock(&task_queue->task_queue_mutex);
		while ((conn = queue_pop(task_queue->pending_conn_queue))) {
			if (!task_queue->conn_head) {
				task_queue->conn_head = conn;
			} else {
				task_queue->conn_tail->readreq_task_queue_next = conn;
			}
			task_queue->conn_tail = conn;
		}
		pthread_mutex_unlock(&task_queue->task_queue_mutex);

		if (task_queue->total_conns == 0) {
			thread_sleep(1);
			continue;
		}

		conn = task_queue->conn_head;
		while (conn) {
			//清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉
			boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT;

			if ((conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) && conn->ref_count == 1) {
				task_queue->total_conns--;

				discarded_conn = conn;//先放入临时变量中

				//从链表中删除
				if (prev_conn) {
					if (!conn->readreq_task_queue_next) {
						task_queue->conn_tail = prev_conn;
						prev_conn->readreq_task_queue_next = NULL;
					} else {
						prev_conn->readreq_task_queue_next = conn->readreq_task_queue_next;
					}
				} else {
					task_queue->conn_head = conn->readreq_task_queue_next;
				}
				conn = conn->readreq_task_queue_next;

				close(discarded_conn->fd);/*真正关闭连接的地方*/
				free(discarded_conn);
			} else {
				ev.data.fd = conn->fd;
				ev.data.ptr = conn;
				/*只负责监视读事件和出错事件*/
				ev.events = EPOLLIN | EPOLLET;
				if(conn->client_status == status_ESTABLISHED) {
					设置超时时间或者设置非堵塞(conn->fd);/*两者必须选一个*/
					epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev);
					conn->client_status = status_CONNECTED;
				} else {
					epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev);
				}
				prev_conn = conn;
				conn = conn->readreq_task_queue_next;
			}
		}

		/** 堵塞等待,超时时间为POLL_TIMEOUT毫秒*/
		if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0)
			continue;

		/**
		* 循环查找向我们发送请求的客户端的连接
		*/
		for (i = 0; i < ready; i++) {
			if (happened_ev[i].events & POLLIN) {
				conn = (conn_t) happened_ev[i].data.ptr;

				conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/

				proc_protocol(conn);/*处理协议函数*/
			} else if(happened_ev[i].events & EPOLLHUP) {
				conn = (conn_t) happened_ev[i].data.ptr;

				conn->client_status = status_CLOSED;
			}
		}
	}
}

/* 协议处理函数 */
void proc_protocol(conn_t conn) {
	某一时刻客户端要求服务器端不断发送数据给客户端,然后设置了conn->client_status = status_START_RECV_DATA,并且调用了下面的move_to_writedata_task_thread(conn);
}

/*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/
static void move_to_writedata_task_thread(conn_t conn) {
	server_t server = conn->server;
	task_queue_t task_queue, min_task_queue= NULL;
	int idx;

	/* 查找连接数最少的任务队列 */
	for (idx = 0; idx < 5; idx++) {
		task_queue = server->writedata_task_queues[idx];

		if (min_task_queue == NULL)
			min_task_queue = task_queue;
		else if (min_task_queue->total_conns > task_queue->total_conns)
			min_task_queue = task_queue;
	}

	if (min_task_queue) {
		conn->ref_count++;/*累加引用计数*/

		/*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/
		pthread_mutex_lock(&min_task_queue->task_queue_mutex);
		queue_push(min_task_queue->pending_conn_queue, conn, 1);
		min_task_queue->total_conns++;
		pthread_mutex_unlock(&min_task_queue->task_queue_mutex);
	}
}

/**
*负责写数据的线程处理主函数,负责管理,处理自己队列中的所有连接
*/
static void *writedata_task_thread(void *arg) {
	task_queue_t task_queue = (task_queue_t) arg;
	int epfd = epoll_create(20480);
	struct epoll_event ev,happened_ev[128];
	int ready,i,nwrite,nread,sndbuf_size = 10240;
	char buf[sndbuf_size];

	while (1) {
		time_t curtime = time(NULL);
		/*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/
		conn_t conn = NULL, prev_conn = NULL, discarded_conn;

		/*把待添加队列里的连接添加到任务队列中*/
		pthread_mutex_lock(&task_queue->task_queue_mutex);
		while ((conn = queue_pop(task_queue->pending_conn_queue))) {
			if (!task_queue->conn_head) {
				task_queue->conn_head = conn;
			} else {
				task_queue->conn_tail->writedata_task_queue_next = conn;
			}
			task_queue->conn_tail = conn;
		}
		pthread_mutex_unlock(&task_queue->task_queue_mutex);

		if (task_queue->total_conns == 0) {
			thread_sleep(1);
			continue;
		}

		conn = task_queue->conn_head;
		while (conn) {
			//清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉
			boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT;

			if (conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) {
				task_queue->total_conns--;

				discarded_conn = conn;//放入临时变量中

				//从链表中删除
				if (prev_conn) {
					if (!conn->writedata_task_queue_next) {
						task_queue->conn_tail = prev_conn;
						prev_conn->writedata_task_queue_next = NULL;
					} else {
						prev_conn->writedata_task_queue_next = conn->writedata_task_queue_next;
					}
				} else {
					task_queue->conn_head = conn->writedata_task_queue_next;
				}
				conn = conn->writedata_task_queue_next;

				discarded_conn->ref_count--;/*引用计数减一,并且该连接已经关闭了,那么真正关闭连接的任务就交给readreq_task_thread函数了*/
			} else {
				ev.data.fd = conn->fd;
				ev.data.ptr = conn;
				/*只负责监视写事件和出错事件*/
				ev.events = EPOLLOUT | EPOLLET;
				if(conn->client_status == status_START_RECV_DATA) {
					setsockopt(conn->fd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, sizeof(int));
					epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev);
					conn->client_status = status_CONNECTED;
				} else {
					epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev);
				}
				prev_conn = conn;
				conn = conn->writedata_task_queue_next;
			}
		}

		/**堵塞等待,超时时间为POLL_TIMEOUT毫秒*/
		if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0)
			continue;

		/**
		* 循环查找向我们发送请求的客户端的连接
		*/
		for (i = 0; i < ready; i++) {
			if (happened_ev[i].events & EPOLLOUT) {
				conn = (conn_t) happened_ev[i].data.ptr;

				conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/
				if(conn->remain_data_size == 0) {/*如果数据缓存已经没有任何数据了的话*/
					if((nread = read(文件描述符,buf,sizeof(buf))) > 0) {/*从硬盘读数据出来到数据缓存*/
						if((nwrite = write(conn->fd,buf,nread)) < 0 && errno != EINTR) {
							conn->client_status = status_CLOSED;
							continue;
						}

						if(nwrite < nread) {
							conn->remain_data_size = nread - nwrite;
							conn->remain_data = must_malloc(conn->remain_data_size);
							memcpy(conn->remain_data,buf + nwrite, conn->remain_data_size);
						}
					} else if(nread == 0) {
						conn->server_status = status_SERVER_CLOSING;
					}
				} else {
					/*写数据缓存里的数据到客户端*/
					if((nwrite = write(conn->fd,conn->remain_data,conn->remain_data_size)) < 0 && errno != EINTR) {
						conn->client_status = status_CLOSED;
						free(conn->remain_data);
						continue;
					}

					if(nwrite < conn->remain_data_size) {
						conn->remain_data_size = conn->remain_data_size - nwrite;
						memcpy(buf, conn->remain_data + nwrite, conn->remain_data_size);
						conn->remain_data = must_realloc(conn->remain_data, conn->remain_data_size);
						memcpy(conn->remain_data, buf, conn->remain_data_size);
					} else {
						free(conn->remain_data);
						conn->remain_data = NULL;
						conn->remain_data_size = 0;
					}
				}
			} else if(happened_ev[i].events & EPOLLHUP) {
				conn = (conn_t) happened_ev[i].data.ptr;

				conn->client_status = status_CLOSED;
			}
		}
	}
}
 
分享到:
评论

相关推荐

    08-分布式游戏服务器开发.rar

    分布式系统是由多台计算机通过网络连接,彼此之间能够进行通信和协作,共同完成一个任务。在游戏服务器开发中,分布式架构可以提高服务的可扩展性、容错性和性能,确保游戏的稳定运行。 分布式游戏服务器的主要组件...

    OPC开发工具包-数据访问服务器开发工具包.rar_CB OPC_OPC DA_OPC 工具_OPC_VC_www.9926.c

    标题提到的“OPC开发工具包-数据访问服务器开发工具包.rar”是一个专为开发者设计的工具包,它简化了OPC DA(Data Access)服务器的开发流程。OPC DA是OPC规范中最基础的部分,主要用于实时数据访问,允许应用程序从...

    深度解析Java游戏服务器开发源代码

    最后,书中的"JavaServerBookCode-master"很可能是一个完整的项目源码仓库,包含了从简单到复杂的各个层次的游戏服务器实现,读者可以通过阅读和运行这些代码,更直观地理解和掌握Java游戏服务器开发的全过程。...

    用ROS做HTTP服务器教程

    ROS(Robot Operating System)是一个开源操作系统,专门设计用于机器人技术,提供了一整套工具和库,使得机器人软件开发变得更加高效和可移植。本教程将教你如何利用ROS来创建一个HTTP服务器,这个服务器可以用于...

    游戏服务器架设教程

    游戏服务器架设教程 本教程旨在指导用户如何架设游戏服务器,涵盖硬件安装、系统安装、系统设置等方面的知识点。 一、硬件安装 ...只有掌握了这些知识点,才能架设一个稳定、高效的游戏服务器。

    Excel服务器高级版教程

    它强调了Excel服务器不仅是一个数据分析工具,而且是一种能够帮助企业构建和管理信息系统的解决方案。这一章会介绍Excel服务器如何帮助用户创建、存储和共享数据,以及构建基于Excel的工作流和报表。此外,还会涉及...

    Java服务器开发(二)写第一个Servlet

    首先,你需要一个集成开发环境(IDE),这里推荐使用MyEclipse,它是一个功能强大的JavaEE开发工具,集成了创建、调试和部署Servlet的功能。 1. **创建Servlet项目**: - 在MyEclipse中,新建一个Dynamic Web ...

    ASP.NET 服务器控件开发技术与实例

    ASP.NET 服务器控件是微软.NET框架中...通过以上内容,你可以深入了解ASP.NET服务器控件的开发技术,并利用这些知识来构建高效、动态的Web应用程序。在实践中不断探索和学习,将有助于你成为一名出色的ASP.NET开发者。

    《PHP网站开发案例教程》课件

    PHP是一种广泛应用于服务器端的开放源代码编程语言,以其简单易学、高效执行和丰富的功能(如图像处理和数据库访问)而闻名。MySQL数据库系统因其高性能、多线程、跨平台和安全性,成为PHP开发中的常见选择。 配置...

    Photon服务器引擎 入门教程二服务器端源码

    在Unity游戏开发中,Photon服务器引擎是一个非常流行的实时网络通信工具,特别适合构建多人在线游戏。本入门教程将深入探讨如何使用Photon服务器引擎构建服务器端源码,以便处理客户端的请求、进行用户身份验证以及...

    Delphi开发B_S数据库应用系统教程_delphi_DelPhi开发B/S_delphiBS_Delphi开发BS

    教程可能涵盖如何使用Delphi构建一个简单的B/S系统,包括用户登录、数据查询、添加编辑删除记录等功能,以及如何处理错误和异常。 通过本教程的学习,开发者将能够掌握利用Delphi开发B/S数据库应用系统的技能,从而...

    Nginx高性能Web服务器实战教程+高清+完整书签

    《Nginx高性能Web服务器实战教程》是一本深入讲解如何利用Nginx构建高效稳定Web服务的书籍。Nginx以其高性能、轻量级、反向代理和负载均衡等特性,已经成为许多企业和开发者首选的Web服务器。这本书涵盖了从基础配置...

    后端开发入门与实战教程:从0到1打造高效Web应用.zip

    通过这个"后端开发入门与实战教程:从0到1打造高效Web应用",你将全面学习后端开发的各个环节,从理论知识到实践技巧,一步步成长为一名合格的后端开发者。阅读并实践其中的内容,相信你会在Web应用开发的道路上大步...

    配置SVN服务器协同开发步骤 SVN服务器与客户端

    1. **选择SVN服务器软件**: 通常推荐使用VisualSVN-Server,它是一个易于管理和配置的SVN服务器。下载并安装VisualSVN-Server.zip,确保选择正确的操作系统版本(如Windows)。 2. **设置Repositories**: 安装完成...

    ASP.NET3.5网站开发实例教程

    ASP.NET3.5作为.NET框架的一个重要组成部分,引入了许多新特性,如LINQ、ADO.NET Entity Framework等,极大地提升了Web应用程序的开发效率和性能。 ### VWD2008开发环境 Visual Web Developer 2008(简称VWD2008)...

    轻量应用服务器建站例程

    Ubuntu是一个基于Debian的开源Linux发行版,它广泛用于服务器环境,因为其稳定性、安全性和用户友好的包管理系统。 1. **安装Python**:Python是这个建站过程的基础,因为它被用作Django框架的语言。在Ubuntu上,你...

    STM32实现Web服务器

    STM32实现Web服务器是一个将微控制器技术与网络通信相结合的课题,对于嵌入式系统开发者来说具有重要的学习价值。...记得在实践中不断积累经验,理解和优化每一个细节,从而提升你的嵌入式开发能力。

Global site tag (gtag.js) - Google Analytics