打印

reactor模式--一个针对基于事件驱动的端口监控和服务多路分发的面向对象包容器

[复制链接]
1358|7
手机看帖
扫描二维码
随时随地手机跟帖
跳转到指定楼层
楼主
keer_zu|  楼主 | 2015-8-21 17:33 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
Douglas C. Schmidt
schmdit@cs.wustl.edu
http://www.cs.wustl.edu/~schmidt/
华盛顿大学计算机科学系,圣路易斯 63130


本文的一个较早版本发表在 1993 年 《C++ Report》 的二月号上。

一、简介

     本文是阐述将现有操作系统进程间通信(IPC)服务封装到面向对象(OO)C++包容器(Wrapper)中的系列论文中第三篇的一个部分。第一篇解释了面向对象包容器的主要原理和目的[1],包容器简化了正确、简洁、可移植以及高效的应用程序的开发。第二篇阐述了一个封装调用应用程序编程接口(API)的 BSD 套接字和 System VTLI,称为 IPC_SAP 的面向对象包容器。IPC_SAP 可以让应用程序访问本地和远程 IPC 协议簇,如,通过一个类型安全、面向对象接口访问 TCP/IP。

     本文(第三篇)展示了一个针对由 select和 poll 系统调用(注: select 调用可以在 BSD 和 SVR4 UNIX 平台,以及随 WinSock API 使用;poll 可以在 System V 的 UNIX 变体中使用)提供的调用输入输出(I/O)端口监控和基于定时器的事件通知工具的面向对象包容器。Select 和 poll 可以让应用程序制定一个超时时间间隔来等待在一个或多个 I/O 描述符上的不同类型的输入输出事件的出现。Select 和 poll 探测某些 I/O或定时器事件何时出现,并将这些事件多路分发到恰当的应用程序。与其它的 OS API 一样,事件多路分发接口是复杂、易出错、不可移植并且不容易扩展。一个称作反应堆(Ractor)的可扩展模式的开发正是克服这些限制。反应堆为简化基于事件驱动的分布式应用的开发和实现,提供一组高级一些的编程抽象。反应堆也保护开发人员陷入现有事件多路分发 API 的许多易出错的细节中,并且改善了应用程序在不同的操作系统变体中的可移植性。

      反应堆多少不同于在[2]中阐述的 IPC_SAP 类封装。IPC_SAP 对 BSD 套接字和 System V TLI API 添加了一层相对瘦的面向对象修饰。另一方面,反应堆提供了一组比那些由 select 和 poll 直接提供的 API 更为丰富的抽象。特别是,反应堆将基于 I/O 的端口监控与基于定时器的事件通知集成起来以为应用程序通信服务的多路分发提供一个通用的框架。端口监控由在许多连接上同时执行 I/O 的事件驱动网络服务器所使用。因为这些服务器必须处理多个连接,所以在单个连接上无限地执行阻塞是不可行的。同样,基于定时器的 API 允许应用程序注册某些通过由反应堆控制的中心定时器服务激活的周期性或非周期性操作。

      这一主题分为两部分。第一部分(在本文中展示)阐述了一个旨在满足高效的事件多路分发需要的分布式日志服务,测试几个可选的解决方案,评估这些可选方案的优缺点,并和反应堆进行比较。第二部分(正在 C++ Report 的随后一期上发表)集中在反应堆的 OO 设计方面。另外,它讨论了这个分布式日志服务的设计和实现。这个例子精确地说明了反应堆如何简化事件驱动的分布式应用的开发。

相关帖子

沙发
keer_zu|  楼主 | 2015-8-21 17:33 | 只看该作者
二、例子:一个分布式日志服务

      要洞悉事件多路分发机制的功能,这一部分阐述了一个同时处理来自多个源头的事件驱动 I/O 的分布式日志服务。如图1 所示,这个分布式日志服务为那些在整个网络中并发操作的应用程序提供几个服务。首先,它为经常用于简化管理的某些状态信息的记录以及分布式应用程序行为的跟踪提供一个集中的地点。要利用这一点,客户守护程序将送出的日志记录打上时间戳,以允许按时间顺序跟踪和重构在不同的主机上执行的多个并发进程的执行顺序。其次,这个功能也允许日志记录按优先级递送。这些记录由客户守护程序按照它们的重要性顺序来接受和转发,而不是以它们最初生成的顺序。

      将许多分布式应用程序的日志活动集中到单一服务器中也很有用,因为通过串行访问来共享诸如控制台、打印机、文件或者网络管理数据库等输出设备。与此相反,没有这样一个集中式的工具,监控和调试由多个并发进程组成的应用程序会变得很困难。例如,来自由多个进程或者线程同时调用的普通 C stdio 库例程(如,fputs 和 printf)的输出经常混杂在一起,当这些输出在同一个窗口或控制台的时候。

      分布式日志工具使用一个客户/服务器架构来设计。服务器日志守护进程收集、格式化并输出从运行在整个局域网和/或广域网的多个主机上的客户端守护进程转发来的日志记录。来自日志服务器的输出可能被重定向到不同的设备,如,打印机、持久存储仓库、或者日志管理控制台。

设计模式系列之三——反应堆(Reactor)
图1: 用于分布式日志工具的网络环境

      如图1中所示,日志工具的进程间通信(IPC)结构引入了几级多路分发。例如,网络中的每一个客户主机包含多个可能参与分布式日志工具的应用程序进程(如 P1、P2 以及 P3)。每一个参与进程使用图1中用方框表示的应用程序日志 API来将调试跟踪和错误诊断格式化到日志记录中。一个日志记录是一个包含几个头区域和一个最大约 1K 字节的对象。当由应用程序进程调用的时候,Log_Msg::log API 预先将当前进程的标识和程序名称加入记录。然后它使用“面向记录”的命名管道 IPC 机制来将记录多路分发到一个运行在网络中指定主机上的服务器日志守护进程。服务器以事件驱动的方式操作,一旦来自多个客户守护进程的日志记录到达时,就处理它们。取决于参与应用程序的日志行为,日志记录可能有任意的客户程序发送,并且在服务器守护进程中在任意时间间隔内到达。

      在每一个客户日志守护进程和指定的服务器日志守护进程之间建立一个单独的 TCP 流连接。每一个客户端连接由服务器中的一个唯一 I/O 描述符表示。另外,服务器也维护一个专门的 I/O 描述符接受来自那些希望参与分布式日志工具的客户守护进程的新连接。在连接建立期间,服务器缓存客户端的主机名(在日志服务器守护进程中用椭圆形表示),并且使用这个信息以打印到输出设备上的格式化记录来标识客户端。

      完整的分布式日志工具的设计和实现在[3]中阐述。本文剩余部分通过探索几个处理来自多个来源的 I/O 可选的机制来展示必要的背景材料。

使用特权

评论回复
板凳
keer_zu|  楼主 | 2015-8-21 17:33 | 只看该作者
三、操作系统的事件多路分发

      现代操作系统,如,UNIX、Windows NT 以及 OS/2 提供了让应用程序在多个描述符上“同时”执行 I/O 的几种技术。本小节阐述四种不同的可选方案,并且比较和对照它们的优缺点。要集中讨论,每一个可选方案都按照在前面第二节阐述的分布式日志工具来讨论其特点。特别是,每一节表现一个使用正在讨论的可选方案实现的框架服务器日志守护进程。为了节省版面并增加清晰性,例子都利用在前一个 C++ Report **[2]中讨论的 OO IPC_SAP 套接字封装库。

      在图2中所示的 handle_logging_record 函数也会由所有的示例服务器守护进程调用。这个函数负责接收并处理日志记录,并且将它们写到适当的输出设备。串行访问输出设备所要求的任何同步机制也在这个函数中执行。通常,开发并行的多进程和多线程方式总归更加复杂,因为输出必须是串行的,以避免由所有不同进程的产生的日志记录混杂在一起。要完成输出的串行化,并行的服务器守护进程通过使用 handle_logging_record 程序中一些同步形式(如,信号量、锁或者其它像 FIFO 或者消息队列等 IPC 机制)来协作。
typedef int ACE_HANDLE;
const int ACE_INVALID_HANDLE - 1;

// Perform two recvs to simulate a message-oriented services
// via the underlying bytestream-oriented TCP connection.
// The first recv reads the length (stored as a fixed-size
// integer) of the adjacent logging record. The second recv
// then reads "length" bytes to obtain the actual record.
// Note that the sender must also follow this protocol...

ssize_t
handle_logging_record( ACE_HANDLE handle )
{
  size_t msg_len;
  LOG_PDU log_pdu;

  ssize_t n = ACE_OS::recv( handle, (char*)&msg_len, sizeof msg_len );

  if ( n != sizeof msg_len )
    return n;
  else
  {
    msg_len = ntohl( msg_len );  // Convert byte-ordering.

    n = ACE_OS::recv( handle, (char*)&log_pdu, msg_len );
    if ( n!= msg_len )
      return -1;     
    log_pdu.decode();
    if ( log_pdu.get_len() == n )
      // Obtain lock here for concurrent designs.
      log_pdu.print( output_device );
      // Release lock here for concurrent designs.
    return n;
    }
  }
}
    图2: 处理日志记录的函数

3.1 一个非阻塞 I/O 解决方案


设计模式系列之三——反应堆(Reactor)
图3:非阻塞 I/O 服务器

      在多个描述符上处理 I/O 的一个方法包括对“轮询”的使用。通过循环地遍历一组打开的描述符,检查每一个挂起的 I/O 活动来进行轮询。图4 表现了说明这种方式的一般性结构的代码片断。最初,创建一个 IPC_SAP 接收器对象,并通过 ACE_SOCK_Acceptor::enable 成员函数设置成“非阻塞模式”。随后,服务器的主循环在打开的描述符之间迭代,尝试接收从每一个描述符输入的日志记录。如果输入立即可用,那么其会被读取并处理。否则,handle_logging_record 函数返回 -1。errno 被置为 EWOULDBLOCK,并且循环在下一个描述符上继续轮询。在所有打开的 I/O 连接都完成一次轮询之后,服务器接受任何一个已到达的新连接,并且再次从头开始对描述符轮询。当 handle_logging_record 函数返回 0(象征客户程序已关闭连接)时,关闭相应的 I/O 描述符。此时,服务器产生一个最高优先级描述符的副本,并且将其存储到最后一个描述符的插座号中(为了将描述符维持在一个连续的范围中)。与此相反,图5 说明了一个使用 fd_set 位掩码来知道当前活动的那些描述符的相似方案。

      轮询的主要缺点是在“忙等待”时执行了不必要的系统调用而消耗了过多的 CPU 周期。例如,如果在 I/O 描述符上只是间歇地出现输入,服务器进程将会重复地并且过多地轮询那些并没有任何挂起日志记录的描述符。另一方面,如果在所有的描述符上持续地接收到 I/O,这个方法可能是合理的。另外,轮询的一个优势是其可以在操作系统平台之间移植。
const u_short LOGGER_PORT = 10000;

int
main( void )
{
  // Create a server end-point.
  ACE_SOCK_Acceptor acceptor( (ACE_INET_Addr)LOGGER_PORT );
  ACE_SOCK_Stream new_stream;

  // Extract descriptor.
  ACE_HANDLE s_handle = acceptor.get_handle();
  ACE_HANDLE maxhandlepl = s_handle + 1;

  // Set acceptor in non-blocking mode.
  acceptor.enable( ACE_NONBLOCK );

  // Loop forever performing logger server processing.
  for (;;)
  {
    // Poll each descriptor to see if logging
    // records are immediately available on
    // active network connections.
    for ( ACE_HANDLE handle = s_handle + 1; handle < maxhandlpl; handle ++ )
    {
      ssize_t n = handle_logging_record( handle );
      if ( n == ACE_INVALID_HANDLE )
      {
        if ( errno == EWOULDBLOCK )  // No input pending.
          continue;
        else
          ACE_DEBUG( ( LM_DEBUG, "recv failed\n" ) );
      }
      else if ( n == 0 )
      {
        // Keep descriptors contiguous.
        ACE_OS::dup2( handle, --maxhandlepl );
        ACE_OS::close( maxhandlepl );
      }
    }
    // Check if new connection requests have arrived.
    while ( acceptor.accept( new_stream ) != -1 )
    {
       // Make new connection non-blocking.
      new_stream.enable( ACE_NONBLOCK );
      handle = new_stream.get_handle();
      ACE_ASSERT( handle + 1 == maxhandlepl );
      maxhandlepl++;
    }
    if ( errno != EWOULDBLOCK )
      ACE_DEBUG( ( LM_DEBUG, "accept failed" ) );
  } // for

}
    图4:一个非阻塞的 I/O 服务器(版本1)

int
main( void )
{
  // Create a server end-point.
  ACE_SOCK_Acceptor acceptor( ( ACE_INET_Addr )PORTNUM );
  ACE_SOCK_Stream new_stream;

  // Extract descriptor.
  ACE_HANDLE s_handle = acceptor.get_handle();
  ACE_HANDLE maxhandlepl = s_handle + 1;

  fd_set in_use;  // Bitmask for active descriptors.
  FD_ZERO( &in_use );
  FD_SET( s_handle, &in_use );

  // Set acceptor SAP into non-blocking mode.
  acceptor.enable( ACE_NONBLOCK );
  // Loop forever performing logger server processing.
  for ( ; ; )
  {
    // Pool each descriptor to see if logging
    // records are immediately available on
    // active network connection.
    for ( ACE_HANDLE handle = s_handle + 1; handle < maxhandlepl; handle++ )
    {
      ssize_t n;
      if ( FD_ISSET( handle, &in_use ) && ( n = handle_logging_record( handle ) ) == -1 )
      {
        if ( errno == EWOULDBLOCK )  // No input pending.
          continue;
        else
          ACE_DEBUG( ( LM_DEBUG, "recv failed" ) );
      }
      else if ( n == 0 )
      {
        ACE_OS::close( handle );
        FD_CLR( handle, &in_use );
        if ( handle + 1 == maxhandlepl )
        {
          // Skip past unused handles.
          while ( !FD_ISSET( --handle, &read_handles ) )
            continue;
          maxhandlepl = handle + 1;
        }
      }
    }
    // Check if new connection requests have arrived.
    while ( acceptor.accept( new_stream ) != -1 )
    {
      // Make new connection non-blocking
      new_stream.enable( ACE_NONBLOCK );
      handle = new_stream.get_handle();
      FD_SET( handle, &in_use );
      if ( handle >= maxhandlepl )
        maxhandlepl = handle + 1;
    }
    if ( errno != EWOULDBLOCK )
      ACE_DEBUG( ( LM_DEBUG, "accept failed" ) );
  }

}
图5:轮询,非阻塞 I/O 服务器(版本2)

使用特权

评论回复
地板
keer_zu|  楼主 | 2015-8-21 17:34 | 只看该作者
3.2 一个多进程解决方案

设计模式系列之三——反应堆(Reactor)
图6:多进程服务器

      另一个解决方案(如图 6 所示)与将应用程序设计成一个创建单独的 OS 进程来管理连接每一个客户日志守护进程的通信通道的“并发服务器”有关。图 7 展示了说明这项技术的代码。
// handle all logging record from a particular client (run in each slave process)
static void
logging_handler( ACE_HANDLE handle )
{
  // Perform a "blocking" receive and process client logging records until client shuts down the connection.
  for ( ssize_t n; ( n = handle_logging_record( handle ) ) > 0; )
    continue;
  if ( n == -1 )
    ACE_DEBUG( ( LM_DEBUG, "recv failed" ) );
  // shutdown the child process.
  ACE_OS::exit();
}

// Reap zombie'd children ( run in the master process ).
static void
child_reaper( int )
{
  for ( int res; ( res = ACE_OS::waitpid( -1, 0, WHOHANG ) ) > 0 || ( res == -1 && errno == EINTR ); )
    continue;
}

static_void
logging_acceptor( void )
{
  // Create a server end-point.
  ACE_SOCK_Acceptor acceptor( ( ACE_INET_Addr )LOGGER_PORT );
  ACE_SOCK_Stream new_stream;

  // Loop forever performing logging server processing.
  for ( ; ; )
  {
    // Wait for client connection request and create a new ACE_SOCK_Stream endpoint ( Note, accept is automatically restarte after interrupts ).
    acceptor.accept( new_stream );

    // Create a new process to handle client request.
    switch ( ACE_OS::fork() )
    {
    case 1:
      ACE_DEBUG( ( LM_DEBUG, "fork failed" ) );
      break;
    case 0:
      acceptor.close();
      logging_handler( new_stream.get_handle() );
     
    default:  // In parent
      break;
    }
  }

}

// Master process.
int
main()
{
  // Set up the SIGCHLD signal handler.
  sigaction sa;

  // Restart interrupted system calls.
  sa.sa_flags = SA_RESTART;
  ACE_OS::sigemptyset( &sa.sa.mask );
  sa.sa_handler = child_reaper;
  // Arrange to reap deceased children.
  if ( ACE_OS::sigaction( SIGCHLD, &sa, 0 ) == -1 )
    ACE_ERROR_RETURN( ( LM_ERROR, "sigaction", -1 );

  logging_acceptor();
}
    图7: 一个多进程服务器

      主服务器中的主循环在监听新的客户端连接请求的到来时阻塞。当请求到达时,由 fork 创建一个单独的从进程。在 logging_handler 子程序中,这个新创建的从进程在某个描述符上执行阻塞 I/O,其接收所有从其相关的那个客户端发送的所有日志记录。当对应的客户守护进程结束,从 recv 系统调用返回 0,这样结束从进程。此时,OS 发送一个 SIGCHLD 信号给主进程。Child_reaper 信号捕获这个信号并且“采集”到僵尸子进程的退出状态信息。注意服务器中信号的出现要求主进程中的主循环正确地处理中断。在大多数 UNIX 平台上,某些系统调用(例如,accept)并不会在信号发生时自动重新启动。应用程序可以在 accept 系统调用返回 ACE_INVALID_HANDLE 时通过检查 errno 是否包含 EINTR 来检查这一点。

     多进程设计有几个缺陷。首先,它可能消耗过多的 OS 资源(如,进程表插座,每一个客户端都要分配一个),这样可能增加 OS 的调度开销。其次,上下文切换在输入到达时一般会要求重新启动等待的进程。其三,正确地处理信号和中断系统调用涉及编写微妙并且有潜在错误倾向的代码。例如,sigaction 接口必须随 SVR4 使用来确保信号布署在第一个 SIGCHILD 信号被捕获后保持设置为先前注册的回调函数。最后,实现串行访问输出设备的互斥机制导致增加了软件的复杂性。对于我们所给出的“事件驱动,离散消息”的分布式日志工具的通信模式来说,这个额外的开销和复杂性未必昂贵。

      总之,某些其它类型的网络服务器从创建单独的进程来处理客户请求中获得了很可观的效益。特别是,这种方式改善了(1)I/O 有限的或者(2)涉及同时、要求大量不定时执行的长时间客户服务(例如,文件传输或者远程登录)的服务器的响应时间[5]。另一个优势是如果底层操作系统有效地支持多进程元素的话,所有的服务器性能以一种应用程序透明的方式得到改善。

使用特权

评论回复
5
keer_zu|  楼主 | 2015-8-21 17:34 | 只看该作者
3.3 多线程解决方案

设计模式系列之三——反应堆(Reactor)
图8:多线程服务器

      第三种方案利用了多线程方法。图9中展示的例子使用 Sun OS 5.x 线程库[6]来实现一个多线程并发服务器。其它的线程库(如,POSIX 和 Window NT 线程)也提供一个同等的方案。在示例代码中,由 ACE_Thread::spawn 子程序产生一个新线程来处理每一个客户连接。此外,还创建必要的栈以及执行单独的线程控制所必需的其它数据结构,ACE_Thread::spawn 子程序调用 logging_handler 函数。这个函数接收从某个特定客户端到达的所有日志记录。注意当客户端关闭时,thr_exit 子程序常常用于退出这个特定的线程,而不是整个进程。

// Handle all logging records from a particular
// client (run in each slave thread).
static void *
logging_handler( ACE_HANDLE handle )
{
  ssize_t n;

  // Perform a "blocking" receive and process
  // client logging records until client shuts
  // down the connection.
  while ( n = handle_logging_record( handle ) > 0 )
    continue;
  if ( n == -1 )
    ACE_DEBUG( ( LM_DEBUG, "recv_failed" ) );
  ACE_OS::close( handle );

  // Exits thread, *not* entire process!
  ACE_Thread::exit();
}

static void
logging_acceptor( void )
{
  // Create a server end-point.
  ACE_SOCK_Acceptor acceptor( ACE_INET_Addr ) LOGGER_PORT );
  ACE_SOCK_Stream new_stream;

  // Loop forever performing logging server processing.
  for (;;)
  {
    // Wait for client connection request and create a
    // new ACE_SOCK_Stream endpoint (automatically
    // restarted upon interrupts).
    acceptor.accept( new_stream );

    // Create a new thread to handle client request.
    if ( ACE_Thread::spawn( ACE_THR_FUNC( logging_handler ), (void*)new_stream.get_handle(), THR_DETACHED | THR_NEW_LWP ) != 0 )
      ACE_ERROR( ( LM_ERROR, "thr_create_failed" ) );
  }

}

// Master server.
int
Main( void )
{
  logging_acceptor();
}
图9:多线程服务器

int select
(
  // Maximum descriptor plus 1.
  int width;
  // bit-mask of "read" descriptors to check.
  fd_set *readfds,
  // bit-mask of "write" descriptors to check.
  fd_set *writefds,
  // bit-mask of "exception" descriptors to check.
  fd_set *exceptfds,
  // Amount of time to wait for events to occur.
  struct timeval *timeout
)
图10: select 接口

int poll
(
  // Array of descriptors of interest.
  struct pollfd fds[],
  // Number of descriptors to check.
  unsigned long nfds,
  // Length of time to wait, inmiliseconds
  int timeout
)
图11: poll 接口

      多线程方法相对容易实现,假设有一个合适的线程库可用,并且提供几个基于多进程方法的优点。例如,复杂的信号处理语义不再是问题,因为服务器产生新线程即可“脱离服务器”。在 Sun OS 5.x 中脱离的线程在退出的时候从不会与主线程控制重新同步(re-synchronize)或者重新接合(re-join)。此外,与进程相比,由于在上下文切换的开销上的减少,创建、执行以及终止一个线程更有效[7]。另外,共享全局数据对象也常常更加方便,因为要获取共享内存无需执行特殊的操作。

      传统的操作系统(如老版的 UNIX 和 Windows)对线程并没有提供足够的支持。例如,一些线程变体只允许每个进程一个显著的系统调用,其它的一些变体并允许多线程控制使用某些 OS APIs (例如,套接字或 RPC)。特别是,许多传统的 UNIX 和 标准 C 库例程并没有被设计成可重入的,这复杂化了其在多线程应用程序中的使用。

使用特权

评论回复
6
keer_zu|  楼主 | 2015-8-21 17:35 | 只看该作者
3.4 事件多路分发解决方案

     第四种方法利用通过 select和 poll系统调用而可用的事件多路分发功能。这些机制突破了上述其它几种解决方案的限制。Select 和 poll 都允许网络应用程序为发生在多个 I/O 描述符上的不同类型的事件而等待的时间有长有短,既无须要求轮询,也不要求多进程或多线程调用。这一部分概述 select和 poll 系统调用,草拟使用这两个调用的日志服务器守护进程的样例实现,并且用 Reactor(反应堆)的面向对象类库的优势对照现有事件多路分发服务的局限性。

3.4.1 Select和 poll 系统调用

      下面的段落 select调用(在图10中示出)和 poll调用(在图11中示出)的相似之处和不同点。这些调用支持基于 I/O 和基于定时器的事件多路分发。Select和 poll 的语法和语义在 [8] 中有极为详细的讲解。

      抛开它们不同的 APIs,select 和 poll 共享许多共有的特性。例如,它们都在一组 I/O 描述符上等待不同的输入、输出以及例外事件的发生,并且返回一个整型值指示有多少个事件发生。另外,这两个系统调用允许应用程序指定一个指示等待事件发生的最大时间的超时间隔。三个基本的超时间隔包括(1)“永久”等待,(即,直到I/O事件的发生或者信号中断系统调用),(2)等待一定的时间单元(既可以以秒/微秒(select),也可以用毫秒(poll)来衡量),和(3)执行“轮询”(即,检查所有的描述符,并且立即带着结果返回)。

      Select 和 poll 之间也有几个不同之处。例如,select 使用三个描述符组(descriptor set)(一个用于读,一个用于写,另一个用于例外),其当作一个位掩码(bit-mask)来实现,以减少所使用的空间量。位掩码中的每一位对应一个可能被允许检查特定I/O事件的描述符。另一方面,poll 函数多少更通用,并且接口少了一些绕弯。Poll API 一个 pollfd 结构的数组,一个此数组中结构数目的计数,以及一个超时值。数组中每一个 pollfd 结构包含(1)检查 I/O 事件的描述符(-1表示这一条结构应该被忽略),(2)感兴趣的事件(一个或多个)(例如,输入和输出情况下的各个属性),以及(3)在描述符(如,输入、输出、挂起,以及错误)上实际发生的事件(一个或多个),这些事件根据从 poll 系统调用的返回来决定是否被允许。注意,在 System V Release 4 之前的版本中,poll 只可用于像终端和网络接口这样的流(STREAM)设备。特别是,它并不可用于像 原始 UNIX 文件和目录这样的任意 I/O 描述符。Select 和 SVR4 poll 系统调用可作用于所有类型的 I/O 描述符之上。

3.4.2 基于 select 的日志服务器示例

      图 13 例示了一个使用 BSD select 系统调用来执行服务器日志守护进程的代码片断。这个服务器实现使用两个描述符组:(1)read_handles(其了解与活动的客户端连接相关的 I/O 描述符)和(2)temp_handles(其是一个通过“值传递”(value/result)给 select 系统调用的 read_handles 描述符组的副本)。最初,在 read_handles 描述符组中只有那个被打开的位对应那个“监听”来自客户日志守护进程的新的入连接请求。

      在初始化完成之后,主循环使用 temp_handles 作为其唯一的描述符组参数(因为服务器既无意处理“写”事件,也无意处理“例外”事件)调用 select。因为最终的参数是 NULL struct timeval * NULL 指针,select 调用阻塞,直到一个或者多个客户端发送日志记录或者请求新的连接(注意:如果发生中断,select 必须手工重新启动)。当 select 返回时,temp_handles 变量会被修改以指示哪一个描述符有挂起的日志记录数据,或者新的客户端连接请求。首先通过迭代整个 temp_handles 组来检查当前已经准备好读(注意: select 语义保证 recv 将不会阻塞在这个读操作上)的描述符来处理日志记录。Recv 函数在客户端关闭连接的时候返回 0。这将知会主服务器循环清除在 read_handles 组中代表那个连接的这个特定位。

      在所有挂起的日志记录都已经被处理完毕之后,服务器检查新的连接请求是否已经到达监听的 I/O 描述符。如果一个或者多个请求已经到达,那么它们将被接受,并且在 read_handles 描述符组中的对应位也会被打开。这一部分代码例示了 select 的“轮询”特性。例如,如果在 struct timeval 参数的两个域都被置 0,select 将检查打开的描述符,并且如果有挂起的连接请求,那么就立即返回以通知应用程序。注意,服务器如何使用 width 变量来了解最大的 I/O 描述符值。这个值限制 select 在每一个调用上必须监督的描述符数目。

int
main( void )
{
  // Create a server end-point.
  ACE_SOCK_Acceptor acceptor( ( ACE_INET_Addr )LOGGER_PORT );
  ACE_SOCK_Stream new_stream;

  ACE_HANDLE s_handle = acceptor.get_handle();
  ACE_HANDLE maxhandlepl = s_handle + 1;

  fd_set temp_handles;
  fd_set read_handles;

  FD_ZERO( &temp_handles );
  FD_ZERO( &read_handles );

  // Loop forever performing logging server processing.
  for (;;)
  {
    temp_handles = read_handles;  // structure assigment.

    // Wait for client I/O events.
    ACE_OS::select( maxhandlepl, &temp_handles, 0, 0 );

    // Handle pending logging records first (s_handle + 1
    // is guaranteed to be lowest client descriptior).
    for ( ACE_HANDLE handle = s_handle + 1; handle < maxhandlepl; handle++ )
    {
      if ( FD_ISSET( handle, &temp_handles ) )
      {
        ssize_t n = handle_logging_record( handle );
        // Guranteed not to block in this case!
        if ( n == -1 )
          ACE_DEBUG( ( LM_DEBUG, "logging failed" ) );
        else if ( n == 0 )
        {
          // Handle client connection shutdown.
          FD_CLR( handle, &read_handles );
          ACE_OS::close( handle );
          if ( handle + 1 == maxhandlepl )
          {
            // Skip past unused descriptors.
            while ( !FD_ISSET( --heandle, &read_handles ) )
              continue;
            maxhandlepl = handle + 1;
          }
        }
      }
    if ( FD_ISSET( s_handle, &temp_handles ) )
    {
      // Handle all pending connection requests
      // (note use of "polling" feature).
      while ( ACE_OS::select( s_handle + 1, &temp_handles, 0, 0, ACE_Time_Value::zero ) > 0 )
        if ( acceptor.accept( new_stream ) == -1 )
          ACE_DEBUG( ( LM_DEBUG, "accept" ) );
        else
        {
          handle = new_stream.get_handle();
          FD_SET( handle, &read_handles );
          if ( handle >= maxhandlepl )
            maxhandlepl = handle + 1;
        }
    }
  } // for (;;)

}
图13:使用 select API 的事件多路分发服务器

// Maximum per-process open I/O descriptos.
const int MAX_HANDLES = 200;

int
main( void )
{
  // Create a server end-point.
  ACE_SOCK_Acceptor acceptor( ( ACE_INET_Addr )LOGGER_PORT );

  struct pollfd poll_array[MAX_HANDLES];
  ACE_HANDLE s_handle = acceptor.get_handle();

  poll_array[0].fd = s_handle;
  poll_array[0].events = POLLIN;

  for ( int nhandles = 1;;)
  {
    // Wait for client I/O events.
    ACE_OS::poll( poll_array, nhandles );

    // Handles pending logging messages first
    // (poll_array[ i = 1].fd is guaranteed to be
    // lowest client descriptor).
    for ( int i = 1; i < nhandles; i++ )
    {
      if ( ACE_BIT_ENABLED( poll_array[i].revents, POLLIN )
      {
        char buf[BUFSIZ];
        sszie_t n_logging_record( poll_array[i].fd );
        // Guaranteed not to block in this case!
        if ( n == -1 )
          ACE_DEBUG( ( LM_DEBUG, "read failed" ) );
        else if ( n == 0 )
        {
          // Handle client connection shutdown.
          ACE_OS::close( poll_array[i].fd );
          poll_array[i].fd = poll_array[--nhandles].fd;
        }
      }
    }
    if ( ACE_BIT_ENABLED( poll_array[0].revents, POLLIN ) )
    {
      // Handle all pending connection requests
      // (note use of "polling" feature).
      while ( ACE_OS::poll( poll_array, 1, ACE_Time_Value::zero ) > 0 )
        if ( acceptor.accept( new_stream, &client ) == -1 )
          ACE_DEUBG( ( LM_DEBUG, "accept" ) );
        else
        {
          poll_array[nhandles].fd = POLLIN;
          poll_array[nhandles++].fd = new_stream.get_handle();
        }
    }
  }

}
图14:使用 poll API 的事件多路分发服务器

3.4.3 基于 poll 的日志服务器示例

      图14 在使用 select 的位置采用 System V UNIX poll 系统调用,重新实现服务器日志守护进程的主要处理循环。注意,这两个服务器的总体结构上几乎是一致的。不过,还是要进行许多细微的修改以适应 poll 接口。例如,不像 select (其针对读、写和例外有单独的位掩码),poll 使用一个单独的 pollfd 结构的数组。通常,poll API 比 select 更加通用,允许应用程序等待范围更广的事件(如,“优先级-分类”的 I/O 事件和信号)。不过,这两个例子中的总体复杂度和源代码的行数几乎相同。

3.4.4 现有事件多路分发服务的局限

    事件多路分发服务解决几个以上所展示的几种途径的局限性。例如,基于事件多路分发的服务器日志守护进程既不要求“忙等”,也不要求单独的进程创建。不过,仍然有许多与直接使用 select 或 poll 相关的许多问题。这一部分说明一些剩余的问题,并解释 Reactor 是如何进行设计以解决这些问题。

复杂并且易错的接口: 用于 select 和 poll 的接口非常通用,在单个的系统调用入口点组合了几个像“限时等待”以及多个 I/O 事件通知等服务。这个通用性降低了正确学习和使用 I/O 多路分发工具的复杂度。另一方面,Reactor 提供了一个更少秘密的 API,这个 API 由多个成员函数组成,其中的每一个执行单个定义好的活动。另外,

使用特权

评论回复
7
dong_abc| | 2015-8-22 16:35 | 只看该作者
libev / libevent也不错。

使用特权

评论回复
8
keer_zu|  楼主 | 2015-8-24 09:48 | 只看该作者
dong_abc 发表于 2015-8-22 16:35
libev / libevent也不错。

libevent就是典型的reactor模式。

使用特权

评论回复
发新帖 我要提问
您需要登录后才可以回帖 登录 | 注册

本版积分规则

1352

主题

12436

帖子

53

粉丝