找回密码
 会员注册
查看: 100|回复: 0

20种不同并发模型示例,带你深入理解并发模型

[复制链接]

3

主题

0

回帖

10

积分

新手上路

积分
10
发表于 2024-9-20 15:17:47 | 显示全部楼层 |阅读模式
作者:lionleeli文末抽奖送书,欢迎参与~导语曾看过很多并发模型相关的文章,但是这些文章大部分只讲了并发模型的实现原理,并没有给出具体的示例代码,看完总觉得对并发模型这个知识点是一知半解,不得要领。为了掌握高并发模型,我这里抛砖引玉,实现了20种常见的并发模型,并对每种并发模型进行了性能压测和分析。由于本人水平有限,文章中难免有一些不准确或者纰漏的地方,欢迎大家批评指正。1.缘起最近看了好友推荐的一本新书《Linux后端开发工程实践》 ,该书RPC框架和微服务集群的部分甚是不错,其中的“第10章-I/O模型与并发”中介绍了17种不同的并发模型,看完之后更是感觉受益匪浅。但美中不足的是这17种并发模型只支持短连接,配套的BenchMark工具不支持发起指定的请求负载,给出的性能指标也不够丰富。受到该内容的启发,我在该内容的基础上实现了20种常见的支持长连接的并发模型,完善了协议解析效率和BenchMark工具。2.前置说明因为这20种并发模型的代码量已经达到了1万2千多行,是不适合在一篇文章中全部展示的。所以我把相关的代码开源在github上,方便大家查看。在文中介绍到相关代码时都会给出具体的代码位置,但只会在文章中贴出关键的代码,即便如此,本文的代码量依然不少,强烈建议收藏后阅读。github上的项目是MyEchoServer,项目链接为:github.com/newland20242.1项目的目录结构MyEchoServer项目的目录结构如下所示。.├── BenchMark│   ├── benchmark.cpp│   ├── client.hpp│   ├── clientmanager.hpp│   ├── makefile│   ├── percentile.hpp│   ├── stat.hpp│   └── timer.hpp├── common│   ├── cmdline.cpp│   ├── cmdline.h│   ├── codec.hpp│   ├── conn.hpp│   ├── coroutine.cpp│   ├── coroutine.h│   ├── epollctl.hpp│   ├── packet.hpp│   └── utils.hpp├── ConcurrencyModel│   ├── Epoll│   ├── EpollReactorProcessPoolCoroutine│   ├── EpollReactorProcessPoolMS│   ├── EpollReactorSingleProcess│   ├── EpollReactorSingleProcessCoroutine│   ├── EpollReactorSingleProcessET│   ├── EpollReactorThreadPool│   ├── EpollReactorThreadPoolHSHA│   ├── EpollReactorThreadPoolMS│   ├── LeaderAndFollower│   ├── MultiProcess│   ├── MultiThread│   ├── oll│   ├── ollReactorSingleProcess│   ├── rocessPool1│   ├── rocessPool2│   ├── Select│   ├── SelectReactorSingleProcess│   ├── SingleProcess│   └── ThreadPool├── readme.md└── test    ├── codectest.cpp    ├── coroutinetest.cpp    ├── makefile    ├── packettest.cpp    ├── unittestcore.hpp    └── unittestentry.cpp相关的目录说明如下。BenchMark是基准性能压测工具的代码目录。ConcurrencyModel是20种不同并发模型的代码目录,这个目录下有20个不同的子目录,每个子目录都代表着一种并发模型的实现示例。common是公共代码的目录。test目录为单元测试代码的目录。3.预备工作因为I/O模型是并发模型涉及到的关键技术点,所以我们也不会免俗,也会介绍一下常见的I/O模型。为了降低实现难度,这里我们实现了一个简单的应用层协议,并实现一些通用的基础代码,以便后续高效的实现不同的并发实例。3.1常见I/O模型常见的I/O模型有五种:阻塞I/O、非阻塞I/O、多路I/O复用、信号驱动I/O、异步I/O。其中的阻塞I/O、非阻塞I/O、多路I/O复用、信号驱动I/O都是同步IO。同步I/O和异步I/O的区别在于,是否需要进程自己再调用I/O读写函数。同步I/O需要,异步I/O不需要。3.1.1阻塞I/O在阻塞IO模式下,只要I/O暂不可用,读写操作就会被阻塞,直到I/O可用为止,在被阻塞期间,当前进程是被挂起的,这样就无法充分的使用CPU,导致并发效率低下。3.1.2非阻塞I/O在非阻塞IO模式下,读写操作都是立即返回,此时当前进程并不会被挂起,这样就可以充分的使用CPU,非阻塞I/O通常会和多路I/O复用配合着一起使用,从而实现多个客户端请求的并发处理。3.1.3多路I/O复用多路I/O复用实现了多个客户端连接的同时监听,大大提升了程序感知客户端连接可读写状态变化的效率。在Linux下多路I/O复用的系统调用为select、poll、epoll。3.1.4信号驱动I/O通过注册SIGIO信号的处理函数,实现了一个I/O就绪的通知机制,在SIGIO信号的处理函数再进行读写操作,从而避免了低效的I/O是否就绪的轮询操作。但是在信号处理函数中是不能调用异步信号不安全的函数,例如,有锁操作的函数就是异步信号不安全的,故信号驱动I/O应用的并不多。3.1.5异步I/O前面的4种I/O模型都是同步IO,最后一种I/O模型是异步IO。异步I/O就是先向操作系统注册读写述求,然后就立马返回,进程不会被挂起。操作系统在完成读写操作之后,再调用进程之前注册读写述求时指定的回调函数,或者触发指定的信号。3.2应用层协议20种并发示例实现的是最常见的Echo(回显)服务,这里我们设计了一个简单的应用层协议,格式如下图所示。协议由两部分组成,第一部分是固定长度(4字节)的协议头部,协议头部用于标识后面的变长协议体的长度,第二部分就是是具体的变长协议体。3.2.1协议实现协议的编解码在common目录的codec.hpp文件中实现,其中DeCode函数用于实现协议的流式解析。采用流式解析,能避免拒绝服务攻击。例如,攻击者创建大量的连接,然后每个连接上只发送一个字节的数据,如果采用常见的解析方式,一直在socket上读取数据,直到完成一个完整协议请求的解析。在不采用协程的情况下,不管是阻塞IO、非阻塞IO、IO复用,当前的工作进程或者线程不是被挂起(阻塞IO),就是CPU使用率飙升(非阻塞IO),服务可用的工作进程或者线程会快速被消耗完,导致服务无法对正常的客户端提供服务,从而形成拒绝服务攻击。流式解析(来多少字节,就解析多少字节)+协程切换(IO不可用时切换到其他协程)+Reactor定时器实现非阻塞IO的超时机制,就可以很好的解决这种拒绝服务攻击。3.2.2共享的二进制缓冲区这里特别说明一下二进制缓冲区的实现。在实现协议的时候,通常会「存储读取到的网络数据缓冲区」和「协议解析的缓冲区」这两份独立的缓冲区。而我这里思考后,发现其实不用多申请一块二进制缓冲区,写入读取到的网络数据和解析读取到的网络数据可以共享同一个二进制缓冲区,进而减少了内存的分配和两块内存之间的拷贝。共享的二进制缓冲区的示意图如下图所示。共享的二进制缓冲区在common目录的packet.hpp文件中实现。3.3命令行参数解析不管是BenchMark工具,还是不同的并发模型程序,都需要支持从命令行中读取动态参数的能力。因为参数解析的getopt系列函数并不易用,故参考Go语言的flag包实现,独立封装了一套易用的命令行参数解析函数。具体的实现在common目录的cmdline.h和cmdline.cpp文件中。3.4协程池实现因为有协程池相关的并发模型,所以需要实现协程池。协程池的实现在common目录的coroutine.h和coroutine.cpp文件中。特别提一下,协程池这里通过getcontext、makecontext、swapcontext这三个库函数来实现,并且通过C++11的模版函数和可变参数模板的特性,实现了支持变参列表的协程创建函数。协程创建函数的实现如下所示。template int CoroutineCreate(Schedule& schedule, Function& f, Args&... args) {  int id = 0;  for (id = 0; id state == Idle) break;  }  if (id >= schedule.coroutineCnt) {    return kInvalidRoutineId;  }  Coroutine* routine = schedule.coroutines[id];  std::function entry = std::bind(std::forward(f), std::forward<< "SingleProcess -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char* argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  while (true) {    int client_fd = accept(sock_fd, NULL, 0);    if (client_fd < 0) {      perror("accept failed");      continue;    }    handlerClient(client_fd);    close(client_fd);  }  return 0;}在main函数中,开启网络监听之后,就陷入死循环,在循序中获取客户端的连接,并处理客户端的请求。5.1.2 MultiProcess多进程的并发模型是专门为每个客户端连接创建一个进程,进程服务完客户端之后再退出。对应的代码如下所示。#include << "MultiProcess -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char* argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  childExitSignalHandler();  // 这里需要忽略子进程退出信号,否则会导致大量的僵尸进程,服务后续无法再创建子进程  while (true) {    int client_fd = accept(sock_fd, NULL, 0);    if (client_fd < 0) {      perror("accept failed");      continue;    }    pid_t pid = fork();    if (pid == -1) {      close(client_fd);      perror("fork failed");      continue;    }    if (pid == 0) {  // 子进程      handlerClient(client_fd);      close(client_fd);      exit(0);  // 处理完请求,子进程直接退出    } else {      close(client_fd);  // 父进程直接关闭客户端连接,否则文件描述符会泄露    }  }  return 0;}在main函数中,开启网络监听之后,就陷入死循环。在循环中,每获取到一个客户端的连接,就调用fork函数创建一个子进程,并在子进程中处理客户端的请求,处理完客户端的请求之后,子进程就直接退出。5.1.3 MultiThread多进程的并发模型是专门为每个客户端连接创建一个线程,线程服务完客户端之后再退出。对应的代码如下所示。#include << "MultiThread -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char* argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  while (true) {    int client_fd = accept(sock_fd, NULL, 0);    if (client_fd < 0) {      perror("accept failed");      continue;    }    std::thread(handlerClient, client_fd).detach();  // 这里需要调用detach,让创建的线程独立运行  }  return 0;}在main函数中,开启网络监听之后,就陷入死循环。在循环中,每获取到一个客户端的连接,就调用thread函数创建一个子线程,并在子线程中处理客户端的请求,处理完客户端的请求之后,子线程就直接退出。5.1.4 ProcessPool1多进程的并发模型需要频繁的创建和销毁进程,这会导致系统开销高,资源占用较多。而进程池的并发模型,则是预先创建指定数量的进程,每个进程不退出,而是一直为不同的客户端提供服务。这种模型可以减少进程的创建和销毁,从而提高系统的并发处理能力,降低系统开销和资源占用。对应的代码如下所示。#include << "worker_id[" << worker_id << "] deal_1w_request" << endl;      count = 0;    }  }}void usage() {  cout << "ProcessPool1 -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char* argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  for (int i = 0; i < GetNProcs(); i++) {    pid_t pid = fork();    if (pid < 0) {      perror("fork failed");      continue;    }    if (0 == pid) {      handler(i, sock_fd);  // 子进程陷入死循环,处理客户端请求      exit(0);    }  }  while (true) sleep(1);  // 父进程陷入死循环  return 0;}在main函数中,开启监听之后,根据系统当前可用的CPU核数,预先创建数量与之相等的子进程。每个子进程都陷入死循环一直监听客户端连接的到来并给客户端提供服务。5.1.5 ProcessPool2在前面的进程池并发模型中,所有的子进程都会调用accept函数来接受新的客户端连接。这种方式存在竞争,当客户端新的连接到来时,多个子进程之间会争夺接受连接的机会。在操作系统内核2.6版本之前,所有子进程都会被唤醒,但只有一个可以accept成功,其他失败,并设置EGAIN错误码。这种方式会导致不必要的系统调用,降低系统的性能。在内核2.6版本及之后,新增了互斥等待变量,只有一个子进程会被唤醒,减少了不必要的系统调用,提高了系统的性能。这种方式称为"惊群"问题的解决方案,可以有效地避免不必要的系统调用,提高系统的并发处理能力。虽然内核2.6版本及之后只有一个子进程被唤醒,但仍然存在互斥等待,这种方式并不够优雅。我们可以使用socket套接字的SO_REUSEPORT选项,让多个进程同时监听在相同的网络地址(IP+Port)上,内核会自动在多个进程之间做连接的负载均衡,而不存在互斥等待行为,从而提高系统的性能和可靠性。对应的代码如下所示。#include << "worker_id[" << worker_id << "] deal_1w_request" << endl;      count = 0;    }  }}void usage() {  cout << "ProcessPool2 -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char* argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  for (int i = 0; i < GetNProcs(); i++) {    pid_t pid = fork();    if (pid < 0) {      perror("fork failed");      continue;    }    if (0 == pid) {      handler(i, ip, port);  // 子进程陷入死循环,处理客户端请求      exit(0);    }  }  while (true) sleep(1);  // 父进程陷入死循环  return 0;}在main函数中,我们根据系统当前可用的CPU核数,预先创建数量与之相等的子进程。每个子进程都创建自己的socket套接字,设置SO_REUSEPORT选项,并在相同的网络地址开启监听。最后,每个子进程都陷入死循环,等待客户端请求的到来,并为其提供服务。5.1.6 ThreadPool在线程池的并发模型中,我们预先创建指定数量的线程,每个线程都不退出,一直等待客户端连接的到来,并为其提供服务。这种方式可以避免频繁地创建和销毁线程,提高系统的性能和效率,同时也可以降低系统的开销和资源占用。线程池并发模型的代码如下所示。#include << "Select -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  int max_fd;  fd_set read_set;  SetNotBlock(sock_fd);  unordered_set << "Poll -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  int nfds = 0;  pollfd *fds = nullptr;  std::unordered_set<< "Epoll -ip 0.0.0.0 -port 1688 -la" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -la,--la       loop accept" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  bool is_loop_accept;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::BoolOpt(&is_loop_accept, "la");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  epoll_event events[2048];  int epoll_fd = epoll_create(1);  if (epoll_fd < 0) {    perror("epoll_create failed");    return -1;  }  cout << "loop_accept = " << is_loop_accept << endl;  Conn conn(sock_fd, epoll_fd, false);  SetNotBlock(sock_fd);  AddReadEvent(&conn);  while (true) {    int num = epoll_wait(epoll_fd, events, 2048, -1);    if (num < 0) {      perror("epoll_wait failed");      continue;    }    for (int i = 0; i < num; i++) {      Conn *conn = (Conn *)events[i].data.ptr;      if (conn-><< "SelectReactorSingleProcess -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  int max_fd;  fd_set read_set;  fd_set write_set;  SetNotBlock(sock_fd);  unordered_set << "PollReactorSingleProcess -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine:arse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  int nfds = 0;  pollfd *fds = nullptr;  unordered_set<< "EpollReactorSingleProcess -ip 0.0.0.0 -port 1688 -multiio -la -writefirst" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -multiio,--multiio  multi io" << endl;  cout << "    -la,--la       loop accept" << endl;  cout << "    -writefirst--writefirst write first" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  bool is_multi_io;  bool is_loop_accept;  bool is_write_first;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::BoolOpt(&is_multi_io, "multiio");  CmdLine::BoolOpt(&is_loop_accept, "la");  CmdLine::BoolOpt(&is_write_first, "writefirst");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  cout << "is_loop_accept = " << is_loop_accept << endl;  cout << "is_multi_io = " << is_multi_io << endl;  cout << "is_write_first = " << is_write_first << endl;  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  epoll_event events[2048];  int epoll_fd = epoll_create(1);  if (epoll_fd < 0) {    perror("epoll_create failed");    return -1;  }  Conn conn(sock_fd, epoll_fd, is_multi_io);  SetNotBlock(sock_fd);  AddReadEvent(&conn);  while (true) {    int num = epoll_wait(epoll_fd, events, 2048, -1);    if (num < 0) {      perror("epoll_wait failed");      continue;    }    for (int i = 0; i < num; i++) {      Conn *conn = (Conn *)events[i].data.ptr;      if (conn-><< "EpollReactorSingleProcessET -ip 0.0.0.0 -port 1688" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  epoll_event events[2048];  int epoll_fd = epoll_create(1);  if (epoll_fd < 0) {    perror("epoll_create failed");    return -1;  }  Conn conn(sock_fd, epoll_fd, true);  SetNotBlock(sock_fd);  AddReadEvent(&conn);  while (true) {    int num = epoll_wait(epoll_fd, events, 2048, -1);    if (num < 0) {      perror("epoll_wait failed");      continue;    }    for (int i = 0; i < num; i++) {      Conn *conn = (Conn *)events[i].data.ptr;      if (conn-><< "EpollReactorSingleProcessCoroutine -ip 0.0.0.0 -port 1688 -d" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -d,--d         dynamic epoll time out" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  bool is_dynamic_time_out{false};  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::BoolOpt(&is_dynamic_time_out, "d");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  int sock_fd = CreateListenSocket(ip, port, false);  if (sock_fd < 0) {    return -1;  }  epoll_event events[2048];  int epoll_fd = epoll_create(1);  if (epoll_fd < 0) {    perror("epoll_create failed");    return -1;  }  cout << "is_dynamic_time_out = " << is_dynamic_time_out << endl;  EventData event_data(sock_fd, epoll_fd);  SetNotBlock(sock_fd);  AddReadEvent(epoll_fd, sock_fd, &event_data);  MyCoroutine::Schedule schedule;  MyCoroutine::ScheduleInit(schedule, 5000);  // 协程池初始化  int msec = -1;  while (true) {    int num = epoll_wait(epoll_fd, events, 2048, msec);    if (num < 0) {      perror("epoll_wait failed");      continue;    } else if (num == 0) {  // 没有事件了,下次调用epoll_wait大概率被挂起      sleep(0);  // 这里直接sleep(0)让出cpu,大概率被挂起,这里主动让出cpu,可以减少一次epoll_wait的调用      msec = -1;  // 大概率被挂起,故这里超时时间设置为-1      continue;    }    if (is_dynamic_time_out) msec = 0;  // 下次大概率还有事件,故msec设置为0    for (int i = 0; i < num; i++) {      EventData *event_data = (EventData *)events[i].data.ptr;      if (event_data-><< "accept mainReactor unix_socet connect. pid = " << getpid() << endl;        });        continue;      }      if (conn-><< "EpollReactorProcessPoolCoroutine -ip 0.0.0.0 -port 1688 -poolsize 8" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -poolsize,--poolsize   pool size" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  int64_t pool_size;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::Int64OptRequired(&pool_size, "poolsize");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  pool_size = pool_size ><< "EpollReactorThreadPool -ip 0.0.0.0 -port 1688 -poolsize 8" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -poolsize,--poolsize   pool size" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  int64_t pool_size;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::Int64OptRequired(&pool_size, "poolsize");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  pool_size = pool_size ><< "EpollReactorThreadPoolHSHA -ip 0.0.0.0 -port 1688 -io 3 -worker 8 -direct" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -io,--io       io thread count" << endl;  cout << "    -worker,--worker   worker thread count" << endl;  cout << "    -direct,--direct   direct send response data by worker thread" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  int64_t io_count;  int64_t worker_count;  bool is_direct;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::Int64OptRequired(&io_count, "io");  CmdLine::Int64OptRequired(&worker_count, "worker");  CmdLine::BoolOpt(&is_direct, "direct");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  cout << "is_direct=" << is_direct << endl;  io_count = io_count ><< "EpollReactorThreadPoolMS -ip 0.0.0.0 -port 1688 -main 3 -sub 8 -mainread" << endl;  cout << "options:" << endl;  cout << "    -h,--help      print usage" << endl;  cout << "    -ip,--ip       listen ip" << endl;  cout << "    -port,--port   listen port" << endl;  cout << "    -main,--main   mainReactor count" << endl;  cout << "    -sub,--sub     subReactor count" << endl;  cout << "    -mainread,--mainread mainReactor read" << endl;  cout << endl;}int main(int argc, char *argv[]) {  string ip;  int64_t port;  int64_t main_reactor_count;  int64_t sub_reactor_count;  bool is_main_read;  CmdLine::StrOptRequired(&ip, "ip");  CmdLine::Int64OptRequired(&port, "port");  CmdLine::Int64OptRequired(&main_reactor_count, "main");  CmdLine::Int64OptRequired(&sub_reactor_count, "sub");  CmdLine::BoolOpt(&is_main_read, "mainread");  CmdLine::SetUsage(usage);  CmdLine::Parse(argc, argv);  cout << "is_main_read=" << is_main_read << endl;  main_reactor_count = main_reactor_count >
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-27 01:24 , Processed in 1.173752 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表