`
zhangyafei_kimi
  • 浏览: 266179 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

自己写的IOCP的程序,备忘(2009-02-05更新)

阅读更多
#include <winsock2.h>
#include <mswsock.h>
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include "vld.h"

#pragma message("automatic link to ws2_32.lib and mswsock.lib")
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "mswsock.lib")




#define RETURN_SUCESS (0)
#define RETURN_FAILED (-1)

#define PORT 5150//端口
#define IOCP_THREAD_MAX 16//最大的线程数
#define DATA_BUFSIZE 8192



//区别是何种完成事件
enum IO_EVENT
{
	IO_EVENT_ACCEPT,
	IO_EVENT_WSARECV,
	IO_EVENT_WSARECVFROM,
	//不支持异步发送
	//IO_EVENT_WSASEND,
	//IO_EVENT_WSASENDTO
};




typedef struct overlapped_wrapper
{
	//OVERLAPPED要放在第一个
	OVERLAPPED overlapped;
	int io_type;//指示是何种IO操作
}overlapped_wrapper;



typedef struct acceptex_block
{
	//OVERLAPPED要放在第一个
	OVERLAPPED overlapped;
	int io_type;//指示是何种IO操作

	char buffer[DATA_BUFSIZE];
	SOCKET listen_socket;
	SOCKET accept_socket;
}acceptex_block;



typedef struct recv_block
{
	//OVERLAPPED要放在第一个
	OVERLAPPED overlapped;
	int io_type;//指示是何种IO操作

	char buffer[DATA_BUFSIZE];
	SOCKET socket;
	WSABUF wsa_recv_buf;
	DWORD bytes_recveived;
}recv_block;



typedef struct recvfrom_block
{
	//OVERLAPPED要放在第一个
	OVERLAPPED overlapped;
	int io_type;//指示是何种IO操作

	char buffer[DATA_BUFSIZE];
	SOCKET socket;
	WSABUF wsa_recv_buf;
	DWORD bytes_recveived;

	//UDP包的源地址
	struct sockaddr_in from_address;
	int from_address_len;
}recvfrom_block;






int get_cpu_number();

int async_AcceptEx(acceptex_block* block);
int async_WSARecv(recv_block* block);
int async_WSARecvFrom(recvfrom_block* block);

void on_acceptex(acceptex_block* block);
void on_recv(recv_block* block);
void on_recvfrom(recvfrom_block* block);
void on_tcp_listen_close(acceptex_block* block);
void on_tcp_close(recv_block* block);
void on_udp_close(recvfrom_block* block);

int init(void);
void uninit(void);
DWORD WINAPI worker_thread(LPVOID CompletionPortID);
void exit_error();





//完成端口的句柄
HANDLE g_completion_port = INVALID_HANDLE_VALUE;
//工作线程句柄
HANDLE g_threads[IOCP_THREAD_MAX];
//工作线程数量
int g_threads_number = 0;


int main(void)
{
	/************************************************************************/
	/*TCP的例子*/
	/************************************************************************/
	SOCKADDR_IN internet_address;
	SOCKET listen_socket;
	acceptex_block* block;

	if(RETURN_FAILED == init())
		exit_error();

	if ((listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET)
		exit_error();

	internet_address.sin_family = AF_INET;
	internet_address.sin_addr.s_addr = htonl(INADDR_ANY);
	internet_address.sin_port = htons(PORT);

	if (bind(listen_socket, (PSOCKADDR) &internet_address, sizeof(internet_address)) == SOCKET_ERROR)
		exit_error();

	if (listen(listen_socket, SOMAXCONN) == SOCKET_ERROR)
		exit_error();

	printf("listening socket %d\n", PORT);

	//把监听的SOCKET和完成端口绑定
	if(NULL == CreateIoCompletionPort((HANDLE)listen_socket, g_completion_port, (u_long)listen_socket, 0))
		exit_error();

	block = (acceptex_block*)malloc(sizeof(acceptex_block));
	block->listen_socket = listen_socket;
	async_AcceptEx(block);
	getchar();
	closesocket(listen_socket);
	getchar();
	uninit();
	return 0;



	/************************************************************************/
	/*UDP的例子*/
	/************************************************************************/
	//SOCKADDR_IN internet_address;
	//SOCKET sock;
	//recvfrom_block* block = (recvfrom_block*)malloc(sizeof(recvfrom_block));

	//if(RETURN_FAILED == init())
	//	exit_error();

	//if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == INVALID_SOCKET)
	//	exit_error();

	//internet_address.sin_family = AF_INET;
	//internet_address.sin_addr.s_addr = htonl(INADDR_ANY);
	//internet_address.sin_port = htons(PORT);

	//if (bind(sock, (PSOCKADDR) &internet_address, sizeof(internet_address)) == SOCKET_ERROR)
	//	exit_error();

	//if(NULL == CreateIoCompletionPort((HANDLE)sock, g_completion_port, (u_long)sock, 0))
	//	exit_error();

	//block->socket = sock;
	//async_WSARecvFrom(block);


	//getchar();
	//closesocket(sock);
	//getchar();
	//uninit();
	//return 0;
}




int init(void)
{
	WSADATA wsa_data;
	int i;
#if defined _DEBUG || defined DEBUG
	//调试时用一个线程方便
	int threads = 1;
#else
	int threads = get_cpu_number();
#endif


	if (WSAStartup(0x0202, &wsa_data) != 0)
		return RETURN_FAILED;

	//建立完成端口
	if ((g_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL)
		return RETURN_FAILED;

	if(threads > IOCP_THREAD_MAX)
		threads = IOCP_THREAD_MAX;
	for(i = 0; i < threads; i++)
	{
		//创建工作线程
		g_threads[g_threads_number++] = CreateThread(NULL, 0, worker_thread, NULL, 0, 0);
	}
	return RETURN_SUCESS;
}



void uninit(void)
{
	//自定义的退出协议,三个参数全为0时退出
	PostQueuedCompletionStatus(g_completion_port, 0, 0, NULL);

	WaitForMultipleObjects(g_threads_number, g_threads, TRUE, INFINITE);

	CloseHandle(g_completion_port);

	WSACleanup();
}


int get_cpu_number()
{
	SYSTEM_INFO system_info;
	GetSystemInfo(&system_info);
	return system_info.dwNumberOfProcessors;
}


void exit_error()
{
	int error = GetLastError();
	if (error == 0)
	{
		exit(RETURN_SUCESS);
	}
	else
	{
		fprintf(stderr, "error:%d\n", error);
		exit(RETURN_FAILED);
	}
}



/*
投递一次AcceptEx请求
返回TRUE,成功
返回FALSE,失败,WSAGetLastError()获取进一步信息
*/
int async_AcceptEx(acceptex_block* block)
{
	DWORD address_length;
	DWORD bytes_received;

	//准备投递一个异步接受请求
	SOCKET accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if(accept_socket == INVALID_SOCKET)
		return RETURN_FAILED;

	block->io_type = IO_EVENT_ACCEPT;
	block->accept_socket = accept_socket;
	memset(&block->overlapped, 0, 
		sizeof(block->overlapped));


	address_length = sizeof(struct sockaddr_in) + 16;
	if(!AcceptEx(
		block->listen_socket, 
		accept_socket,
		block->buffer,//这个参数会传递给完成端口
		0,//DATA_BUFSIZE - i * 2,		//传0进来,接受的连接的时候不接受数据
		address_length, address_length,
		&bytes_received, 
		&block->overlapped))
	{
		if(WSAGetLastError() != ERROR_IO_PENDING)
			goto fail;
	}
	return RETURN_SUCESS;

fail:
	closesocket(accept_socket);
	return RETURN_FAILED;
}








/*
投递一次WSARecv请求
返回TRUE,表示接受请求投递成功
返回FALSE,表示连接已经断开,WSAGetLastError()获取进一步信息
*/
int async_WSARecv(recv_block* block)
{
	int ret;
	DWORD recv_bytes, flags = 0;

	block->io_type = IO_EVENT_WSARECV;
	block->bytes_recveived = 0;
	block->wsa_recv_buf.len = DATA_BUFSIZE;
	block->wsa_recv_buf.buf = block->buffer;
	memset(&(block->overlapped), 0, sizeof(block->overlapped));


	//投递一次接受请求
	ret = WSARecv(block->socket, &block->wsa_recv_buf, 1, &recv_bytes, &flags,
		&(block->overlapped), NULL);

	if(ret == -1 && WSAGetLastError() != ERROR_IO_PENDING)
	{
		printf("WSARecv() error returns %d\n", ret);
		on_tcp_close(block);
		return RETURN_FAILED;
	}
	else if((ret == 0) || (ret == -1 && WSAGetLastError() == ERROR_IO_PENDING))
	{
		//waiting... for next turn
	}
	else
	{
		//如果直接收到了数据
		block->bytes_recveived = ret;
		on_recv(block);
		//递归
		return async_WSARecv(block);
	}

	return RETURN_SUCESS;
}





/*
投递一次WSARecvFrom请求
返回TRUE,表示接受请求投递成功
返回FALSE,表示接受请求投递失败,WSAGetLastError()获取进一步信息
*/
int async_WSARecvFrom(recvfrom_block* block)
{
	int ret;
	DWORD recv_bytes = 0, flags = 0;

	block->io_type = IO_EVENT_WSARECVFROM;
	block->bytes_recveived = 0;
	block->wsa_recv_buf.len = DATA_BUFSIZE;
	block->wsa_recv_buf.buf = block->buffer;
	memset(&block->from_address, 0, sizeof(block->from_address));
	block->from_address_len = sizeof(block->from_address);
	memset(&(block->overlapped), 0, sizeof(block->overlapped));


	//投递一次接受请求
	ret = WSARecvFrom(block->socket, &block->wsa_recv_buf, 1, &recv_bytes, &flags,
		(struct sockaddr*)&block->from_address, &block->from_address_len,
		&(block->overlapped), NULL);

	if(ret == -1 && WSAGetLastError() != ERROR_IO_PENDING)
	{
		printf("WSARecvFrom() error returns %d %d\n", ret, WSAGetLastError());
		on_udp_close(block);
		return RETURN_FAILED;
	}
	else if((ret == 0) || (ret == -1 && WSAGetLastError() == ERROR_IO_PENDING))
	{
		//waiting... for next turn
	}
	else
	{
		//如果直接收到了数据
		block->bytes_recveived = ret;
		on_recvfrom(block);
		//递归
		return async_WSARecvFrom(block);
	}

	return RETURN_SUCESS;
}


void on_acceptex(acceptex_block* block)
{
	DWORD i;
	struct sockaddr *p_local_addr;
	int local_addr_len = sizeof(struct sockaddr_in);
	struct sockaddr *p_remote_addr;
	int remote_addr_len = sizeof(struct sockaddr_in);
	struct sockaddr_in *p_v4_addr;

	recv_block* r_block;


	printf("on_acceptex %d\n", block->accept_socket);

	i = sizeof(struct sockaddr_in) + 16;
	GetAcceptExSockaddrs(
		block->buffer,
		0,//DATA_BUFSIZE - i * 2,
		i, i,
		&p_local_addr,
		&local_addr_len,
		&p_remote_addr,
		&remote_addr_len
		);

	p_v4_addr = (struct sockaddr_in *)p_local_addr;
	printf("\t本地地址%s:%d\n", 
		inet_ntoa(p_v4_addr->sin_addr), ntohs(p_v4_addr->sin_port));
	p_v4_addr = (struct sockaddr_in *)p_remote_addr;
	printf("\t远程地址%s:%d\n", 
		inet_ntoa(p_v4_addr->sin_addr), ntohs(p_v4_addr->sin_port));



	//准备投递一次WSARecv请求
	r_block = (recv_block*)malloc(sizeof(recv_block));
	r_block->socket = block->accept_socket;

	//绑定
	CreateIoCompletionPort((HANDLE)r_block->socket, 
		g_completion_port, (u_long)r_block->socket, 0);

	//投递一次接受请求
	async_WSARecv(r_block);

	//继续投递AcceptEx请求
	async_AcceptEx(block);
}

void on_recv(recv_block* block)
{
	printf("on_recv %d, 收到%d bytes数据\n", block->socket, block->bytes_recveived);

	async_WSARecv(block);
}


void on_recvfrom(recvfrom_block* block)
{
	printf("on_recvfrom %d, 收到%d bytes数据, 来自%s:%d\n",
		block->socket,
		block->bytes_recveived,
		inet_ntoa(block->from_address.sin_addr),
		ntohs(block->from_address.sin_port));

	async_WSARecvFrom(block);
}


void on_tcp_listen_close(acceptex_block* block)
{
	printf("on_tcp_listen_close %d\n", block->accept_socket);
	free(block);
	closesocket(block->accept_socket);
}


void on_tcp_close(recv_block* block)
{
	printf("on_tcp_close %d\n", block->socket);
	free(block);
	closesocket(block->socket);
}


void on_udp_close(recvfrom_block* block)
{
	printf("on_udp_close %d\n", block->socket);
	free(block);
	closesocket(block->socket);
}



DWORD WINAPI worker_thread(LPVOID nothing)
{
	DWORD bytes;
	overlapped_wrapper* over_type;
	BOOL close_socket = FALSE;
	BOOL ret;

	UNREFERENCED_PARAMETER(nothing);

	for(;;)
	{
		SOCKET socket;
		//注意第三个参数,他是CreateIoCompletionPort时传入的,直接传入的是一个SOCKET
		//注意第四个参数,他可能是一个recv_block或acceptex_block结构的指针
		//因为OVERLAPPED是PER_IO_OPERATION_DATA的第一个成员,所以可以安全的进行转换
		ret = GetQueuedCompletionStatus(g_completion_port, &bytes,
			(LPDWORD)&socket, (LPOVERLAPPED *) &over_type, INFINITE);


		if(ret == ERROR_SUCCESS)
		{
			DWORD last_error = GetLastError();

			if(ERROR_INVALID_HANDLE == last_error)
			{
				printf("完成端口被关闭,退出\n");
				return 0;
			}
			else if(ERROR_NETNAME_DELETED == last_error
				|| ERROR_OPERATION_ABORTED == last_error)
			{
				printf("socket被关闭 或者 操作被取消\n");
				close_socket = TRUE;
			}
			else
			{
				printf("GetLastError %d\n", last_error);
				continue;
			}
		}
		//自定义的退出协议,三个参数全为0时退出(见uninit中的PostQueuedCompletionStatus)
		else if(bytes == 0 && socket == 0 && over_type == NULL)
		{
			return 0;
		}

		assert(over_type);

		switch(over_type->io_type)
		{
		case IO_EVENT_ACCEPT:
			{
				acceptex_block* a_block = (acceptex_block*)over_type;

				if(close_socket)
				{
					on_tcp_listen_close(a_block);
				}
				else
				{
					on_acceptex(a_block);
				}
			}
			break;

		case IO_EVENT_WSARECV:
			{
				recv_block* r_block = (recv_block*)over_type;
				//连接断开
				if (close_socket || bytes == 0 || bytes == -1)
				{
					//测试一下,确定对方肯定关闭连接了
					char test_close;
					int r = recv(r_block->socket, &test_close, sizeof(test_close), MSG_PEEK);
					if(r == 0 || r == -1)
					{
						on_tcp_close(r_block);
					}
				}
				//收到了bytes字节的数据
				else
				{
					//处理数据
					r_block->bytes_recveived = bytes;
					on_recv(r_block);
				}
			}
			break;


		case IO_EVENT_WSARECVFROM:
			{
				recvfrom_block* rf_block = (recvfrom_block*)over_type;

				if(close_socket || bytes == -1 || bytes == 0)
				{
					on_udp_close(rf_block);
				}
				else
				{
					//处理数据
					rf_block->bytes_recveived = bytes;
					on_recvfrom(rf_block);
				}
			}
			break;


		default:
			break;
		}
	}
	return 0;
}
分享到:
评论

相关推荐

    IOCP-SRC.zip_IOCP_IOCP C-C

    【标题】"IOCP-SRC.zip_IOCP_IOCP C-C" 涉及的主要知识点是IOCP(I/O完成端口)技术,这是一种在Windows操作系统中实现高效并发I/O操作的方法,尤其适用于处理大量并发连接的服务器应用。在这个压缩包中,我们可以...

    IOCP本质论-----关于IOCP本质的文章

    IOCP,即I/O完成端口(Input/Output Completion Port),是Windows操作系统中的一种高效处理I/O操作的机制。它是Windows系统对异步I/O的一种高级实现,特别适合于高并发、高性能的网络服务器开发。本文将深入探讨...

    IOCP-完成端口-改编CSDN小猪例子

    http://blog.csdn.net/piggyxp/article/details/6922277#comments这篇文章很好的描述了IOCP并提供了很好的客户端和服务器例子;而我为服务器添加了发送数据,采用WSASend函数,然后给客户端加了接收数据,方便测试...

    一个简单的IOCP服务器/客户端类

    - "IOCP-SRC"可能包含以下部分: - Server类:包含IOCP服务器的实现,包括初始化、监听、接收连接和处理I/O请求等功能。 - Client类:包含IOCP客户端的实现,包括连接服务器、发送和接收数据等操作。 - 工作线程...

    Iocp-stable-server.rar_IOCPTest.sln_Iocp-stable-server_iocp C++_

    标题中的"Iocp-stable-server"表明这是一个基于IOCP(Input/Output Completion Port)的稳定服务器实现,而...通过学习和实践这个"Iocp-stable-server"项目,开发者可以提升自己在构建高性能网络服务方面的技能。

    IOCP服务交互源码-易语言.zip

    易语言是一种基于中文编程的计算机程序设计语言,它旨在降低编程技术门槛,让不懂英文的用户也能进行软件开发。在“IOCP服务交互源码-易语言.zip”这个压缩包中,包含的是使用易语言实现的IO Completion Port(IOCP...

    IOCP完成端口--收发不同的数据包

    根据IOCP聊天室程序修改的一套网络聊天源码,在完成端口实现聊天室的基础上,增加了收发数据结构不一样结构体的操作,一直在寻找怎么样实现WINSOCK中如何收发数据结构不一样数据包。不懈努力下终于实现,本人技术...

    libevent-1.4.4-iocp-3

    《深入理解libevent-1.4.4-iocp-3:事件驱动编程与I/O复用技术的应用》 在IT行业中,事件驱动编程和I/O复用技术是构建高性能网络服务的关键技术之一。libevent是一个广泛使用的开源库,它为开发者提供了在多种操作...

    iocp小牛远控1.0代码-开源

    "控件版本"可能指的是服务器端的控制台界面或者特定的GUI组件,而"iocp-Server"则表明这部分代码专注于实现基于IOCp的服务器端I/O操作。 从这些文件名中,我们可以推测该开源项目不仅涉及了IOCp的基本原理和实现,...

    VC++ socket iocp 程序

    **VC++ Socket IOCP程序详解** 在Windows操作系统中,I/O完成端口(IOCP,Input/Output Completion Port)是一种高效、多线程的异步I/O模型,特别适合于处理大量的并发网络连接。在VC++环境下,我们可以利用IOCP来...

    VC IOCP示例程序

    **VC IOCP 示例程序** **概述** VC IOCP(完成端口)示例程序是基于Microsoft Visual C++(VC)开发的一个项目,它利用了Windows操作系统中的IOCP(完成端口)技术来实现高并发、非阻塞的网络通信。IOCP是一种高级...

    IOCP-SocketAsyncEventArgs-C#高性能大容量SOCKET并发完成端口例子.zip

    在这个示例项目"IOCP-SocketAsyncEventArgs-master"中,你可能会找到以下关键组件和设计模式: 1. **Server端**:服务器端程序使用IOCP和SocketAsyncEventArgs来接收和处理客户端的连接请求。它会创建一个IOCP,...

    IOcp.rar_ iocp-server_IOCP_IOCP 文件_io_完成端口

    标签中的“_iocp-server”、“iocp”、“iocp_文件”、“io”和“完成端口”进一步强调了这个话题集中在IO完成端口技术,尤其是关于文件操作的实现。IOCP通常用于网络服务,例如HTTP服务器、FTP服务器,以及任何需要...

    iocp程序源码

    **IOCP(I/O完成端口)程序源码解析** IOCp(Input/Output Completion Port,输入/输出完成端口)是Windows操作系统提供的一种高效、多线程的异步I/O模型,它允许应用程序处理大量的并发连接。在Windows系统中,...

    Iocp模块G-Sockets 1.1 Demo源码

    Iocp(Input/Output Completion Port)模块是Windows操作系统中的一种高效的异步I/O模型,主要用于构建高性能的网络服务器。G-Sockets 1.1 Demo源码提供了基于Iocp的服务器实现,允许开发者深入了解如何利用Iocp来...

    Iocp模块G-Sockets 1.1 Demo源码(Bug修复)

    "Iocp模块G-Sockets 1.1 Demo源码(Bug修复)"是一个专注于网络编程的项目,特别是针对高性能服务器的设计。Iocp(Input/Output Completion Port)是Windows操作系统中的一个高级I/O模型,用于提高多线程环境下的并发...

    IOCP 结构聊天室实例

    这个程序使用了 IOCP 通讯模式. 使用了 tinyxml 来阅读配置. 可以根据xml配置客户端和服务器端的IP / 端口 ----------------------------------------------------------- 实现功能 名单,聊天,公告,GM功能 ---...

    IOCP客户端模拟程序

    《IOCP客户端模拟程序详解与应用》 IOCP(I/O完成端口)是Windows操作系统中的一种高效异步I/O模型,它在处理大量并发I/O请求时表现出极高的性能和可扩展性。本篇文章将深入探讨IOCP客户端模拟程序的设计、实现以及...

    易语言TCP-IOCP连接

    易语言TCP-IOCP连接源码,TCP-IOCP连接,服务器处理函数,子程序2,标记,向上跳转,调用子程序_,读内存整数,写内存整数,复制内存,取字节集地址,封装分包,CRC32,取类函数地址,取整数型地址,取类指针,启动,停止,取消息类型,...

Global site tag (gtag.js) - Google Analytics