温习一下AIO编程,AIO编程简单实例

新浪微博 QQ空间

完成端口(AIO)是Windows下开发多并发网络连接、异步服务器程序的最常用方案。JDK 1.7版本引入了AIO的相关封装。这里把之前写过的一个AIO例子发出来温习一下。为下一步分析JDK的AIO做一些准备工作。

下面的例子是AIO与线程池结合的例子,每个连接在建立之后即调用一次receive方法,这里的receive是不阻塞主线程的继续运行接受其他连接的。可以把这个调用想象成一个事件注册的过程,告诉内核对象,我关心这样的读取数据事件,当数据过来时,先把它收下来,放到之前用参数指定的内存缓冲区中,然后通知一下我就可以了。子线程就在等待数据接收完成的完成端口上面等待,直到数据接收完成,处理数据。让繁杂的数据接收拷贝过程交给内核对象去完成。使得线程管理和编程更加简单。

#include <stdio.h>
#include <Winsock2.h>
#include <windows.h>
#include <process.h>
#include <list>

using std::list;


#define DATA_BUFSIZE 4096

typedef struct
{
WSAOVERLAPPED overlap;
WSABUF DataBuf;
char buffer[DATA_BUFSIZE];
DWORD NumberOfBytesRecvd;
DWORD Flags;
}PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

void err_sys(char* err)
{
fprintf(stderr, err);
WSACleanup();
exit(-1);
}

unsigned __stdcall WorkThread(void* param);

int main(int argc, char* argv[])
{
HANDLE CompletionPort;
WSADATA WsaData;
int clilen = sizeof(sockaddr_in);
sockaddr_in seraddr, cliaddr;
SOCKET listenfd, connfd;
int opt = 1;

SYSTEM_INFO systeminfo;
unsigned i;

int ret;
LPPER_IO_OPERATION_DATA lpPerIOData = NULL;

HANDLE hWorkThread;
unsigned threadID;

if (WSAStartup(0×0101, &WsaData) == SOCKET_ERROR)
{
err_sys(“WSAStartup Failed\n”);
}

CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (CompletionPort == NULL)
{
err_sys(“create port error!\n”);
}

seraddr.sin_family = AF_INET;
seraddr.sin_addr.s_addr = INADDR_ANY;
seraddr.sin_port = htons(9877);

listenfd = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (listenfd == INVALID_SOCKET)
{
err_sys(“socket error!\n”);
}

if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)) < 0)
{
err_sys(“setsockopt error!\n”);
}

if (bind(listenfd, (SOCKADDR *) &seraddr, sizeof(seraddr)) == SOCKET_ERROR)
{
err_sys(“bind error\n”);
}

if (listen(listenfd, 5) == SOCKET_ERROR)
{
err_sys(“listen error!\n”);
}

printf(“server is listening…\n”);

if (CreateIoCompletionPort((HANDLE) listenfd, CompletionPort,
(DWORD) listenfd, 0) == NULL)
{
err_sys(“associate lisenfd error!\n”);
}

GetSystemInfo(&systeminfo);

//创建工作线程
for (i = 0; i < systeminfo.dwNumberOfProcessors; i++)
{
hWorkThread = (HANDLE) _beginthreadex(NULL, 0, &WorkThread,
CompletionPort, 0, &threadID);
CloseHandle(hWorkThread);
}

list<LPPER_IO_OPERATION_DATA>* ioplist = new list<LPPER_IO_OPERATION_DATA>();

while (true)
{
if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen)) ==
SOCKET_ERROR)
{
int code = WSAGetLastError();
printf(“accept error!\n”);
continue;
}

//将连接句柄关联到I/O完成端口
if (CreateIoCompletionPort((HANDLE) connfd, CompletionPort,
(DWORD) connfd, 0) == NULL)
{
printf(“associate connfd error!\n”);
closesocket(connfd);
continue;
}

lpPerIOData = (LPPER_IO_OPERATION_DATA)
malloc(sizeof(PER_IO_OPERATION_DATA));
ZeroMemory(lpPerIOData, sizeof(PER_IO_OPERATION_DATA));
lpPerIOData->DataBuf.len = DATA_BUFSIZE;
lpPerIOData->DataBuf.buf = lpPerIOData->buffer;
printf(“0 lpPerIOData = %x\n”, lpPerIOData);
ioplist->push_front(lpPerIOData);
ret = WSARecv(connfd, &lpPerIOData->DataBuf, 1,
&lpPerIOData->NumberOfBytesRecvd,
&lpPerIOData->Flags, &lpPerIOData->overlap, NULL);
if (ret == SOCKET_ERROR)
{
ret = WSAGetLastError();
if (ret != WSA_IO_PENDING)
{
printf(“WSARecv error!\n”);
closesocket(connfd);
}
}
}

for(list<LPPER_IO_OPERATION_DATA>::iterator itr = ioplist->begin(); itr != ioplist->end(); itr++)
{
free(*itr);
}
delete ioplist;

CloseHandle(CompletionPort);
closesocket(listenfd);
WSACleanup();
return 0;
}

//工作线程
unsigned __stdcall WorkThread(void* param)
{
int ret;
DWORD BytesTransferred;
SOCKET connfd;

LPPER_IO_OPERATION_DATA lpPerIOData = NULL;

HANDLE cport = (HANDLE) param;

printf(“In work thread…\n”);
while (true)
{
printf(“1 lpPerIOData = %x\n”, lpPerIOData);

//等待I/O完成通知
ret = GetQueuedCompletionStatus(cport, &BytesTransferred,
(LPDWORD) & connfd,\
(LPOVERLAPPED *) &lpPerIOData, INFINITE);
if (ret == 0 || BytesTransferred == 0)
{
//close connection
closesocket(connfd);
}
else
{
printf(“2 lpPerIOData = %x\n”, lpPerIOData);

printf(“recv %d bytes!\n”, BytesTransferred);
ret = send(connfd, lpPerIOData->buffer, BytesTransferred, 0);
if (ret == SOCKET_ERROR)
{
printf(“send error!\n”);
closesocket(connfd);
continue;
}
else
{
printf(“send %d bytes!\n”, ret);
}

ZeroMemory(&lpPerIOData->overlap, sizeof(OVERLAPPED));
ret = WSARecv(connfd, &lpPerIOData->DataBuf, 1,
&lpPerIOData->NumberOfBytesRecvd,
&lpPerIOData->Flags, &lpPerIOData->overlap, NULL);
if (ret == SOCKET_ERROR)
{
ret = WSAGetLastError();
if (ret != WSA_IO_PENDING)
{
printf(“WSARecv error!\n”);
closesocket(connfd);
}
}
}
}
_endthreadex(0);
return 0;
}

新浪微博 QQ空间

| 1 分2 分3 分4 分5 分 (4.80- 5票) Loading ... Loading ... | 这篇文章归档在:C/C++, IO编程. | 永久链接:链接 | 评论(0) |

评论

邮箱地址不会被泄露, 标记为 * 的项目必填。

8 - 2 = *



You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <img alt="" src="" class=""> <pre class=""> <q cite=""> <s> <strike> <strong>

返回顶部