io_multiplexer


Network<ToolKit> includes a class called os_io_multiplexer for multiplexing socket operations on multiple ports. The three possible types of interesting condition that it can wait for are listed below.

An os_io_multiplexer contains a set of socket descriptors that are considered for each condition type. To set or access the descriptors that are scanned for read, write, and exception conditions, use read_scan() , write_scan() , and exception_scan() .

When you execute select() with an optional time-out parameter, select() blocks the process until at least one interesting condition is detected or a time-out occurs. select() then returns the number of descriptors with a pending condition. To determine which descriptors are associated with the detected conditions, use readable() , writable() , and exceptional() .

The following example spawns a thread to run server_func() , which creates a collection of connection servers and an I/O multiplexer interested in the readable condition. This I/O multiplexer detects connection requests on any of the registered connection ports. If no connection requests are received after a second, the server prints a message, and then loops and restarts the select() .

The example then creates three additional threads a few seconds apart: one thread to run client_func1() and the remaining threads to run client_func2() . Each client connects to a different port on the server. The server uses select() to detect the connection request and spawns a child process to service the connection. After spawning a child for each pending connection request, the server reenters the select() and waits for the next client. After both client threads complete, the example terminates the server and exits.

Example <ospace/network/examples/iomux1.cpp>
#include <iostream>
#include <string>
#include <ospace/network.h>
#include <ospace/stream.h>
#include <ospace/thread.h>
#include <ospace/uss/std/string.h>

int
tcp_bounce( os_tcp_socket& socket, const char* name, const char* str )
  {
  cout << name << " sends: " << str << endl;
  os_bstream stream( socket ); // Create a binary stream on the socket.
  stream << string( str ); // Send a string.
  string strng;
  stream >> strng; // Read the response.
  cout << name << " receives: " << strng << endl;
  return 0;
  }

void*
client_func1( void* )
  {
  os_tcp_socket socket;
  socket.connect_to( os_socket_address( 7001 ) );
  return (void*) tcp_bounce( socket, "Client1", "hello" );
  }

void*
client_func2( void* )
  {
  os_tcp_socket socket;
  socket.connect_to( os_socket_address( 7003 ) );
  return (void*) tcp_bounce( socket, "Client2", "goodbye" );
  }

void*
transact( void* arg )
  {
  os_tcp_socket* socket = (os_tcp_socket*) arg;
  os_bstream stream( *socket );
  string str;
  stream >> str;
  cout << "Server received: " << str << endl;
  cout << "Server sends: " << str << endl;
  stream << str;
  delete socket;
  return 0;
  }

void
serve( vector< os_sock_t >& servers, os_io_multiplexer& mux )
  {
  // Obtain vector of readable descriptors.
  vector< os_sock_t > readable = mux.readable();
  vector< os_sock_t >::iterator iter;
  cout << "Server detects " << readable.size()
    << " pending requests." << endl;

  // Iterate through all readable descriptors.
  for ( iter = readable.begin(); iter != readable.end(); iter++ )
    {
    // Service connection.
    os_tcp_connection_server server( *iter );
    server.auto_close( false );
    cout << "Server processing request on port ";
    cout  << server.socket_address().port() << endl;
    os_tcp_socket* socket = new os_tcp_socket;

    if ( server.accept( *socket ) )
      {
      os_thread dispatch( transact, socket );
      }
    }
  }

void* server_func( void* )
  {
  os_tcp_connection_server server1( os_socket_address( 7001 ) );
  os_tcp_connection_server server2( os_socket_address( 7002 ) );
  os_tcp_connection_server server3( os_socket_address( 7003 ) );

  // Build collection of all connection server descriptors.
  vector< os_sock_t > servers;
  servers.push_back( server1.descriptor() );
  servers.push_back( server2.descriptor() );
  servers.push_back( server3.descriptor() );

  // Build an I/O multiplexer to scan the descriptors.
  os_io_multiplexer mux;
  mux.read_scan( servers );

  // Loop, servicing a connection on any connection server.
  while( true )
    {
    int count = mux.select( 1 ); // Demonstrate timeout capability.
    if (count == 0)
      cout << "Time out" << endl;
    else
      serve( servers, mux );
    }
  cout << "Server shutting down." << endl;
  return 0;
  }

void
main()
  {
  os_network_toolkit init_network;
  os_streaming_toolkit init_streaming;
  os_thread_toolkit init_thread;

  os_thread server( server_func );
  os_thread client1( client_func1 );
  os_this_thread::sleep( 3 ); // To force a mux timeout.
  os_thread client2( client_func2 ); // Spawn another client process.
  os_thread client3( client_func2 ); // Spawn another client process.
  os_this_thread::wait_for_thread( client1 );
  os_this_thread::wait_for_thread( client2 );
  os_this_thread::wait_for_thread( client3 );
  cout << "Both threads are done, terminating the server." << endl;
  }

Client1 sends: hello
Server detects 1 pending requests.
Server processing request on port 7001
Server received: hello
Server sends: hello
Client1 receives: hello
Time out
Time out
Server detects 1 pending requests.
Server processing request on port 7003
Client2 sends: goodbye
Server detects 1 pending requests.
Server processing request on port 7003
Client2 sends: goodbye
Server received: goodbye
Server sends: goodbye
Server received: goodbye
Server sends: goodbye
Client2 receives: goodbye
Client2 receives: goodbye
Both threads are done, terminating the server.

Copyright©1994-2026 Recursion Software LLC
All Rights Reserved - For use by licensed users only.