?? epoll_td2.cc
字號:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555
#define INFTIM 1000
#define MAXFDS 16 * 10000
#define EVENTSIZE 15 * 10000
// 線程池任務隊列結構體
struct task
{
int fd; // 需要讀寫的文件描述符
struct task *next; // 下一個任務
};
// 用于讀寫兩個的兩個方面傳遞參數
struct user_data
{
int fd;
unsigned int n_size;
char line[MAXLINE];
};
// 線程的任務函數
void *readtask(void *args);
void *writetask(void *args);
// 聲明epoll_event結構體的變量,ev用于注冊事件,數組用于回傳要處理的事件
struct epoll_event ev, events[EVENTSIZE];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t cond1;
struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;
// 設置非阻塞模式
void setnonblocking(int sock)
{
int opts;
opts = fcntl(sock,F_GETFL);
if(opts < 0)
{
perror("fcntl(sock,GETFL)");
exit(1);
}
opts = opts | O_NONBLOCK;
if(fcntl(sock, F_SETFL, opts) < 0)
{
perror("fcntl(sock,SETFL,opts)");
exit(1);
}
}
// 發送數據
ssize_t socket_send(int sockfd, const char *buffer, size_t buflen)
{
ssize_t tmp;
size_t total = buflen;
const char *p = buffer;
for( ; ; )
{
tmp = send(sockfd, p, total, 0);
if(tmp < 0)
{
// 當send收到信號時,可以繼續寫,但這里返回-1.
if(errno == EINTR)
{
return -1;
}
// 當socket是非阻塞時,如返回此錯誤,表示寫緩沖隊列已滿,
// 在這里做延時后再重試.
if(errno == EAGAIN)
{
usleep(1000);
continue;
}
return -1;
}
if((size_t)tmp == total)
{
return buflen;
}
total -= tmp;
p += tmp;
}
return tmp;
}
// 接收數據
ssize_t socket_recv(int sockfd, char *buffer, size_t buflen)
{
int rs, n;
while(rs)
{
n = recv(sockfd, buffer, MAXLINE, 0);
if(n < 0)
{
// 由于是非阻塞的模式,所以當errno為EAGAIN時,表示當前緩沖區已無數據可讀
// 在這里就當作是該次事件已處理處.
if(errno == EAGAIN)
{
break;
}
else
{
return -1;
}
}
else if(n == 0)
{
// 這里表示對端的socket已正常關閉.
close(sockfd);
}
if(n == sizeof(buffer))
{
rs = 1; // 未讀完 需要再次讀取
}
else
{
rs = 0;
}
}
return n;
}
int main(int argc, char *argv[])
{
int i, maxi, listenfd, connfd, sockfd,nfds;
int opt = 1;
pthread_t tid1,tid2;
struct task *new_task=NULL;
struct user_data *rdata=NULL;
socklen_t clilen;
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond1,NULL);
// 初始化用于讀線程池的線程
pthread_create(&tid1, NULL, readtask, NULL);
pthread_create(&tid2, NULL, readtask, NULL);
// 生成用于處理accept的epoll專用的文件描述符
epfd = epoll_create(MAXFDS);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
// 重復綁定
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));
// 把socket設置為非阻塞方式
setnonblocking(listenfd);
// 設置與要處理的事件相關的文件描述符
ev.data.fd = listenfd;
//設置要處理的事件類型
ev.events = EPOLLIN | EPOLLET;//EPOLLIN | EPOLLERR | EPOLLHUP;
// 注冊epoll事件
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd,&ev);
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
//char *local_addr="192.168.100.5";
//inet_aton(local_addr,&(serveraddr.sin_addr));
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(SERV_PORT);
bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, LISTENQ);
maxi = 0;
for ( ; ; )
{
// 等待epoll事件的發生
nfds = epoll_wait(epfd, events, EVENTSIZE, -1);
// 處理所發生的所有事件
for(i = 0; i < nfds; ++i)
{
if(events[i].data.fd == listenfd)// 客戶端;連接事件
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
if(connfd < 0)
{
perror("connfd < 0");
exit(1);
}
setnonblocking(connfd);
unsigned char *sip = (unsigned char *)&clientaddr.sin_addr.s_addr;
printf("%d connect from >> %d.%d.%d.%d\n", connfd, sip[0], sip[1], sip[2], sip[3]);
// 設置用于讀操作的文件描述符
ev.data.fd = connfd;
// 設置用于注測的讀操作事件
ev.events = EPOLLIN | EPOLLET;
//注冊ev
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
}
else if(events[i].events & EPOLLIN)// 有讀數據
{
printf("reading!\n");
if ( (sockfd = events[i].data.fd) < 0)
{
continue;
}
new_task = new task();
new_task->fd = sockfd;
new_task->next = NULL;
// 添加新的讀任務
pthread_mutex_lock(&mutex);
if(readhead==NULL)
{
readhead = new_task;
readtail = new_task;
}
else
{
readtail->next = new_task;
readtail = new_task;
}
// 喚醒所有等待cond1條件的線程
pthread_cond_broadcast(&cond1);
pthread_mutex_unlock(&mutex);
}
else if(events[i].events & EPOLLOUT)// 有寫數據
{
rdata = (struct user_data *)events[i].data.ptr;
sockfd = rdata->fd;
write(sockfd, rdata->line, rdata->n_size);
delete rdata;
// 設置用于讀操作的文件描述符
ev.data.fd = sockfd;
// 設置用于注測的讀操作事件
ev.events = EPOLLIN | EPOLLET;
// 修改sockfd上要處理的事件為EPOLIN
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
else if(events[i].events & EPOLLHUP)
{
// 斷開的情況,刪除事件,然后關閉fd
rdata = (struct user_data *)events[i].data.ptr;
sockfd = rdata->fd;
// 設置用于讀操作的文件描述符
ev.data.fd = sockfd;
ev.events = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &ev);
//struct epoll_event event; //event.events = 0; //event.data.u64 = 0; //event.data.ptr = sockfd; //epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &event);
printf("epollhup %d\n", sockfd);
close(sockfd);
}
}
}
}
void *readtask(void *args)
{
int fd = -1;
unsigned int n;
//用于把讀出來的數據傳遞出去
struct user_data *data = NULL;
for( ; ; )
{
pthread_mutex_lock(&mutex);
//等待到任務隊列不為空
while(readhead == NULL)
{
pthread_cond_wait(&cond1, &mutex);
}
fd = readhead->fd;
// 從任務隊列取出一個讀任務
struct task *tmp = readhead;
readhead = readhead->next;
delete tmp;
pthread_mutex_unlock(&mutex);
data = new user_data();
data->fd = fd;
if ((n = read(fd, data->line, MAXLINE)) < 0)
{
if (errno == ECONNRESET)
{
std::cout<<"errno == ECONNRESET "<<fd<<std::endl;
close(fd);
}
else
{
std::cout<<"readline error "<<fd<<std::endl;
}
if(data!=NULL)
{
delete data;
}
}
else if (n == 0)
{
printf("%d Client close connect!\n", fd);
close(fd);
if(data != NULL)
{
delete data;
}
}
else
{
// 通知寫事件
data->n_size = n;
//設置需要傳遞出去的數據
ev.data.ptr = data;
//設置用于注測的寫操作事件
ev.events = EPOLLOUT | EPOLLET;
//修改sockfd上要處理的事件為EPOLLOUT
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -