dispatcher


The Reactor pattern provides a framework for demultiplexing events and dispatching the appropriate service handlers.

The class os_dispatcher builds upon the os_io_multiplexer class by implementing an event demultiplexing and dispatching mechanism. An os_io_multiplexer object signals the handles that have a pending condition or an event (read, write, except, timeout). The os_dispatcher object demultiplexes these events to associated event handlers. The os_dispatcher class contains an event_t enumerator type which has four enumerators - read_event , write_event , except_event , and timeout_event for specifying handles to be monitored for read, write, exceptional, and timeout conditions. To monitor an event on a particular handle, a service handler is registered with the dispatcher object. Upon signaling of the event on the particular handle, the associated service handler's callback function is called to notify the occurrence of the event. The callback function should process the event and return control quickly. Since dispatching and event handling happens in a single thread of control time consuming event handler functions should be executed in a separate thread. Usually, a single instance of the os_dispatcher object can serve as a central event dispatcher for an application. The instance() static method creates a singleton dispatcher which gets deleted when the process ends.

The os_io_multiplexer class uses the select() library function to monitor the handles for events. Since the os_dispatcher class uses the os_io_multiplexer class in its implementation, only handles that are accepted by select() library function are allowed to be registered for event notification.

The os_tcp_acceptor , os_tcp_connector , and os_dispatcher classes can be collectively used to synchronously or asynchronously establish connections and perform service processing at the two connection endpoints.

Use of member template feature in the register handler method gives the flexibility of not having the handler classes to inherit from a specific interface class.

The following example consists of two parts. The first part is the server (dispatcher1s.cpp) and the second part is the client (dispatcher1c.cpp). On the server side, an os_echo_acceptor object is created in the main() routine. The acceptor registers a callback function accept() with the dispatcher to monitor read events on its descriptor. The dispatcher object calls the acceptor's accept() method when a connect request arrives. The accept() method creates an os_echo_server handler object to handle the connection. After the connection is accepted, the handler's void* run() method is executed to service the connection. The os_echo_server handler object registers a callback function handle_read() , with the dispatcher to monitor read events on its descriptor. When a readable condition is detected on this descriptor, the dispatcher invokes the handler's handle_read() function. The handle_read() serves the connection, unregisters the descriptor from the dispatcher and deletes itself. On the client side, the os_echo_client object's void* run() method sets its socket in a non blocking mode and tries to connect with the server. If the connection completes immediately, the socket is set back to non-blocking mode and data is written to the socket using the write() method. After writing, a callback function handle_read() is registered with the dispatcher to monitor readable condition (response from the server) on its socket descriptors. When the dispatcher detects a readable condition on the handler socket, it will invoke the handle_read() function. The handle_read() function reads the response from the server and unregisters the descriptor from the dispatcher and deletes itself. If the connection cannot be completed immediately in the void* run()  method, connection complete callback function connect_complete() is registered with the dispatcher to monitor the writeable/exceptional conditions on the socket descriptor. When the connection completes, a writeable/exeptional condition is detected on the socket descriptor by the dispatcher and the dispatcher invokes the connect_complete() method to notify the os_echo_client handler object. For asynchronous connections, the connect_complete() function should check to see if the socket is connected without errors. It is not connected the pending error can be retrieved using the get_and_clear_error() methods. Once the connection is complete the processing is same as above.

Example <ospace/framework/examples/dispatcher1s.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_server
  {
  public:
    void* run();
    void handle_read( os_sock_t );
    void serve();
    os_tcp_socket& tcp_socket();

  private:
    os_tcp_socket socket_;
  };


void*
os_echo_server::run()
  {
  os_singleton< os_dispatcher >::instance()->register_handler
    (
    this,
    os_dispatcher::read_event,
    &os_echo_server::handle_read,
    socket_.descriptor()
    );
  return 0;
  }


void
os_echo_server::handle_read( os_sock_t handle )
  {
  serve();

  // Since we are done serving the client, unregister
  // the handler from the dispatcher and delete it.
  os_singleton< os_dispatcher >::instance()->unregister_handler
    (
    os_dispatcher::read_event, handle
    );

  delete this; // Created by the acceptor.
  }


void
os_echo_server::serve()
  {
  try
    {
    os_bstream stream( socket_ );
    string message;
    stream >> message;
    cout << "Server received: " << message << endl;
    cout << "Server sends: " << message << endl;
    stream << message;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  }


os_tcp_socket&
os_echo_server::tcp_socket()
  {
  return socket_;
  }



class os_echo_acceptor
  {
  public:
    os_echo_acceptor
      (
      const os_socket_address& address,
      os_dispatcher* dispatcher
      );

    ~os_echo_acceptor();

    void accept( os_sock_t );

  private:
    os_tcp_connection_server server_;
    os_dispatcher* dispatcher_;
  };


os_echo_acceptor::os_echo_acceptor
  (
  const os_socket_address& address,
  os_dispatcher* dispatcher
  ) :
  server_( address ),
  dispatcher_( dispatcher )
  {
  dispatcher_->register_handler
    (
    this,
    os_dispatcher::read_event,
    &os_echo_acceptor::accept,
    server_.descriptor()
    );
  }

os_echo_acceptor::~os_echo_acceptor()
  {
  dispatcher_->unregister_handler
    (
    os_dispatcher::read_event, server_.descriptor()
    );
  }


// Called by the dispatcher.
void
os_echo_acceptor::accept( os_sock_t )
  {
  os_echo_server* handler = new os_echo_server;
  server_.accept( handler->tcp_socket() );
  handler->run();
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Run in the same thread as the server.
    os_dispatcher* dispatcher = os_singleton< os_dispatcher >::instance();
    os_echo_acceptor server( os_socket_address( 7001 ), dispatcher );

    bool timed_out = false;
    while ( !timed_out )
      timed_out = dispatcher->dispatch( 10 /* secs */ );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye

Example <ospace/framework/examples/dispatcher1c.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_client
  {
  public:
    os_echo_client
      (
      int port_,
      const string& name,
      const string& message
      );

    void* run();
    void handle_read( os_sock_t );
    void connect_complete( os_sock_t );
    void read();
    void write();
    os_tcp_socket& tcp_socket();

  private:
    int port_;
    string name_;
    string message_;
    os_tcp_socket socket_;
  };


os_echo_client::os_echo_client
  (
  int port,
  const string& name,
  const string& message
  ) :
  port_( port ),
  name_( name ),
  message_( message )
  {
  }


void*
os_echo_client::run()
  {
  socket_.non_blocking( true );
  if ( socket_.connect_to( os_socket_address( 7001 ) ) == false )
    {
    socket_.non_blocking( false );
    os_singleton< os_dispatcher >::instance()->register_handler
      (
      this,
      os_dispatcher::write_event,
      &os_echo_client::connect_complete,
      socket_.descriptor()
      );
    os_singleton< os_dispatcher >::instance()->register_handler
      (
      this,
      os_dispatcher::except_event,
      &os_echo_client::connect_complete,
      socket_.descriptor()
      );
    }
  else
    {
    socket_.non_blocking( false );
    write();
    os_singleton< os_dispatcher >::instance()->register_handler
      (
      this,
      os_dispatcher::read_event,
      &os_echo_client::handle_read,
      socket_.descriptor()
      );
    }
  return 0;
  }


// This method is called when the connection completes or an error occurs.
void
os_echo_client::connect_complete( os_sock_t handle )
  {
  os_singleton< os_dispatcher >::instance()->unregister_handler
    (
    os_dispatcher::write_event, handle
    );
  os_singleton< os_dispatcher >::instance()->unregister_handler
    (
    os_dispatcher::except_event, handle
    );

  // For asynchronous connections, always check if the
  // socket was connected without errors.
  if ( !socket_.connected() )
    {
    cerr << "os_echo_client connection failed due to errno = "
      << socket_.get_and_clear_error() << endl;
    return;
    }

  write();

  os_singleton< os_dispatcher >::instance()->register_handler
    (
    this,
    os_dispatcher::read_event,
    &os_echo_client::handle_read,
    socket_.descriptor()
    );
  }


void
os_echo_client::handle_read( os_sock_t handle )
  {
  read();

  // Since we are done receiving the request from the server,
  // unregister the handler from the dispatcher and delete it.
  os_singleton< os_dispatcher >::instance()->unregister_handler
    (
    os_dispatcher::read_event, handle
    );

  delete this; // Created by the acceptor.
  }


void
os_echo_client::write()
  {
  try
    {
    cout << name_ << " sends: " << message_ << endl;
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    stream << message_;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  }


void
os_echo_client::read()
  {
  try
    {
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    string response;
    stream >> response;
    cout << name_ << " receives: " << response << endl;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Create clients.
    os_dispatcher* dispatcher = os_singleton< os_dispatcher >::instance();
    os_echo_client* client1 = new os_echo_client( 7001, "Client1", "hello" );
    os_echo_client* client2 = new os_echo_client( 7001, "Client2", "goodbye" );

    client1->run();
    client2->run();

    bool timed_out = false;
    while ( !timed_out )
      timed_out = dispatcher->dispatch( 5 /* secs */ );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Client1 sends: hello
Client2 sends: goodbye
Client1 receives: hello
Client2 receives: goodbye

The following example is similar to the previous example but is much simpler. It uses the os_tcp_acceptor and os_tcp_connector classes on the server and the client sides to simplify connection establishments. On the server side (dispatcher2s.cpp), the use of os_tcp_acceptor object removes the burden of monitoring the server socket for incoming connections from the application. The os_echo_acceptor object when constructed with a dispatcher, automatically register's itself with the dispatcher. The dispatcher later notifies the echo acceptor of incoming connections by calling the acceptor's handle_accept() method. The handle_accept() method executes the echo server object to service the connections. All this is done implicitly in the os_tcp_acceptor class. On the client side (dispatcher2c.cpp), the os_tcp_connector object removes the asynchronous handling burden from the application. The connector when constructed with dispatcher argument will automically register a connection completion dispatcher when asynchronous connections cannot be completed immediately.

Example <ospace/framework/examples/dispatcher2s.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_server : public os_handler_stream
  {
  public:
    void* run();
    void handle_read( os_sock_t );
    void serve();
  };


void*
os_echo_server::run()
  {
  os_singleton< os_dispatcher >::instance()->register_handler
    (
    this,
    os_dispatcher::read_event,
    &os_echo_server::handle_read,
    socket_.descriptor()
    );
  return 0;
  }


void
os_echo_server::handle_read( os_sock_t handle )
  {
  serve();

  // Since we are done serving the client, unregister
  // the handler from the dispatcher and delete it.
  os_singleton< os_dispatcher >::instance()->unregister_handler
    (
    os_dispatcher::read_event, handle
    );

  delete this; // Created by the acceptor.
  }


void
os_echo_server::serve()
  {
  try
    {
    os_bstream stream( socket_ );
    os_string message;
    stream >> message;
    cout << "Server received: " << message << endl;
    cout << "Server sends: " << message << endl;
    stream << message;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  }



typedef os_tcp_acceptor
  <
  os_echo_server,
  os_same_thread_executor,
  os_creator< os_echo_server >
  > os_echo_acceptor;



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Run in the same thread as the server.
    os_same_thread_executor executor;
    os_dispatcher* dispatcher = os_singleton< os_dispatcher >::instance();
    os_echo_acceptor server
      (
      os_socket_address( 7001 ), &executor, dispatcher
      );

    bool timed_out = false;
    while ( !timed_out )
      timed_out = dispatcher->dispatch( 10 /* secs */ );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye

Example <ospace/framework/examples/dispatcher2c.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_client : public os_handler_stream
  {
  public:
    os_echo_client( const os_string& name, const os_string& message );

    void* run();
    void handle_read( os_sock_t );
    void read();
    void write();

  private:
    os_string name_;
    os_string message_;
  };


os_echo_client::os_echo_client
  (
  const os_string& name,
  const os_string& message
  ) :
  name_( name ),
  message_( message )
  {
  }


void*
os_echo_client::run()
  {
  // For asynchronous connections, always check if the
  // socket was connected without errors.
  if ( !socket_.connected() )
    {
    cerr << "os_echo_client connection failed due to errno = "
      << socket_.get_and_clear_error() << endl;
    return (void*)1;
    }

  write();
  os_singleton< os_dispatcher >::instance()->register_handler
    (
    this,
    os_dispatcher::read_event,
    &os_echo_client::handle_read,
    socket_.descriptor()
    );
  return 0;
  }


void
os_echo_client::handle_read( os_sock_t handle )
  {
  read();

  // Since we are done receiving the request from the server,
  // unregister the handler from the dispatcher and delete it.
  os_singleton< os_dispatcher >::instance()->unregister_handler
    (
    os_dispatcher::read_event, handle
    );

  delete this; // Created by the acceptor.
  }


void
os_echo_client::write()
  {
  try
    {
    cout << name_ << " sends: " << message_ << endl;
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    stream << message_;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  }


void
os_echo_client::read()
  {
  try
    {
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    os_string response;
    stream >> response;
    cout << name_ << " receives: " << response << endl;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Create clients.
    os_dispatcher* dispatcher = os_singleton< os_dispatcher >::instance();
    os_echo_client* client1 = new os_echo_client( "Client1", "hello" );
    os_echo_client* client2 = new os_echo_client( "Client2", "goodbye" );

    // Connect asychronously.
    os_tcp_connector connector( dispatcher );
    os_same_thread_executor executor;
    connector.connect
      (
      client1, os_socket_address(7001), &executor, os_tcp_connector::async
      );

    connector.connect
      (
      client2, os_socket_address(7001), &executor, os_tcp_connector::async
      );

    bool timed_out = false;
    while ( !timed_out )
      timed_out = dispatcher->dispatch( 5 /* secs */ );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Client1 sends: hello
Client2 sends: goodbye
Client1 receives: hello
Client2 receives: goodbye

The following example illustrates how the use of os_tcp_acceptor and os_dispatcher simplifies monitoring, dispatching, establishing, and servicing connections when compared to just using the os_io_multiplexer and os_tcp_connection_server classes. The functionality in this example is same as the os_io_multiplexer example illustrated in iomux2c.cpp and iomux2c.cpp but is much simpler. The dispatcher2s.cpp uses a separate os_tcp_acceptor object for accepting and servicing connections on each different port. Each of the acceptors is registered with the dispatcher so that they are notified when connection requests are pending on their descriptors. Upon accepting the connections, the server handlers are executed in their own thread of control.

Example <ospace/framework/examples/dispatcher3s.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_server : public os_handler_stream
  {
  public:
    void* run();
  };


void*
os_server::run()
  {
  try
    {
    os_bstream stream( socket_ );
    os_string message;
    stream >> message;
    cout << "Server received: " << message << endl;
    cout << "Server sends: " << message << endl;
    stream << message;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  delete this; // Created in the acceptor.
  return 0;
  }



typedef os_tcp_acceptor
  <
  os_server,
  os_new_thread_executor,
  os_creator< os_server >
  >
  os_server_acceptor;



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;
  os_thread_toolkit init_thread;

  try
    {
    os_dispatcher* dispatcher = os_singleton< os_dispatcher >::instance();
    os_new_thread_executor executor;

    // Create servers.
    os_server_acceptor server1
      (
      os_socket_address( 7001 ), &executor, dispatcher
      );

    os_server_acceptor server2
      (
      os_socket_address( 7002 ), &executor, dispatcher
      );

    os_server_acceptor server3
      (
      os_socket_address( 7003 ), &executor, dispatcher
      );

    // Loop dispatching events until no activity for 10 secs.
    bool timed_out = false;
    while ( !timed_out )
      timed_out = dispatcher->dispatch( 10 );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye
Server received: thankyou
Server sends: thankyou

Example <ospace/framework/examples/dispatcher3c.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_client
  {
  public:
    os_client( int port, const string& name, const string& message );
    void* run();

  private:
    int port_;
    string name_;
    string message_;
    os_tcp_socket socket_;
  };


os_client::os_client
  (
  int port,
  const string& name,
  const string& message
  ) :
  port_( port ),
  name_( name ),
  message_( message )
  {
  }


void*
os_client::run()
  {
  try
    {
    socket_.connect_to( os_socket_address( port_ ) );
    cout << name_ << " sends: " << message_ << endl;
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    stream << message_;
    string response;
    stream >> response;
    cout << name_ << " receives: " << response << endl;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  delete this;
  return 0;
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;
  os_thread_toolkit init_thread;

  try
    {
    // Create clients.
    os_thread client1( new os_client( 7001, "Client1", "hello" ) );
    os_thread client2( new os_client( 7003, "Client2", "goodbye" ) );
    os_thread client3( new os_client( 7003, "Client3", "thankyou" ) );

    os_this_thread::wait_for_thread( client1 );
    os_this_thread::wait_for_thread( client2 );
    os_this_thread::wait_for_thread( client3 );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Client1 sends: hello
Client2 sends: goodbye
Client3 sends: thankyou
Client1 receives: hello
Client2 receives: goodbye
Client3 receives: thankyou

Copyright©1994-2026 Recursion Software LLC
All Rights Reserved - For use by licensed users only.