tcp_acceptor, tcp_connector


The Acceptor-Connector pattern decouples the responsibility of connection establishment and service initialization in a network application from the processing that the application is designed to perform. The benefit is a more flexible, more reusable application resulting in less maintenance.

tcp_acceptor

The os_tcp_acceptor template class extends the os_tcp_connection_server class. The os_tcp_connection_server class allows accepting incoming connections on a particular address. The os_tcp_acceptor extends it by providing functionality for creating and executing service handlers to serve the accepted connections. The type of the service handler, its creation, and its execution strategies can be specified as template instantiation parameters to the os_tcp_acceptor class.

There are three template parameters. The first parameter, ServiceHandler specifies the service handler object type. A service handler object contains a void* run() method that implements the functionality to service the client after accepting the connection. The second parameter, Executor specifies the concurrency strategy to use to execute the ServiceHandler 's void* run() method after the connection is accepted. Refer to the classes os_same_thread_executor , os_new_thread_executor , and os_thread_pool_executor for more information on the concurrency strategies. Apart from these, users can define and use their own concurrency strategy classes like separate process, process pool as an Executor class. The third parameter Creator is a function object which is used to create the ServiceHandler object. The default Creator is os_creator which creates a default constructed ServiceHandler object on the heap and returns it. The ServiceHandler object should delete itself after it is done serving the client.

Connection requests are handled in the handle_accept() method. A ServiceHandler object is created using the Creator function object. The created ServiceHandler object should contain an os_tcp_socket object member which is used to establish the connection with the client socket. A ServiceHandler object could inherit from os_handler_stream class for the os_tcp_socket object or alternatively implement the functionality of os_handler_stream class directly in its own class without inheriting from it. After the connection is established the ServiceHandler object is passed to the Executor object for execution. The Executor object calls the void* run() method of the ServiceHandler to initiate service handling.

The application can wait for the connection requests using the handle_accept() method or through the os_dispatcher instance. A single thread application could use os_dispatcher class and a multithread application could use a separate thread that repeated calls handle_accept() to accept connections and initiate service handlers.

tcp_connector

The os_tcp_connector class implements the strategy for initiating a connection and executing a connect handler when the connection is established. The connection can be initiated synchronously or asynchronously. The class os_dispatcher is used for asynchronous connection completion notifications.

The connect() method is used to connect to a server socket with a particular address. If the connection mode is os_tcp_connector::sync , the calling thread blocks until the connection is established. If the connection mode is os_tcp_connector::async the calling thread will not block if the connection cannot be made immediately and an internal connection completion handler is registered with the dispatcher which is notified when the connection competes.

The connect() method is a member template function that takes two template arguments. The first template argument ConnectHandler specifies the type of the service handler object that is executed upon connection establishment. For establishing the connection, it should contain a os_tcp_socket socket object member. This can be obtained by inheriting the ConnectHandler class object from the os_handler_stream class. Alternatively, the os_handler_stream functionality can be implemented directly in the ConnectHandler class without inheriting from os_handler_stream class. The second template argument Executor specifies the type of the concurrency strategy object used to execute the ConnectHandler object's void* run() method after the connection is established. Refer to the classes os_same_thread_executor , os_new_thread_executor , and os_thread_pool_executor for more information on the concurrency strategies. Apart from these, users can define and use their own concurrency strategy classes like separate process, process pool as the executor class. The void* run() method of the ConnectHandler should implement the service offered/requested once the connection is established.

The os_tcp_acceptor , os_tcp_connector , and os_dispatcher classes can be collectively used to synchronously or asynchronously establish connections and perform service processing at the two connection endpoints.

Use of member template feature gives the flexibility of not having the various classes to inherit from a specific interface class.

In the following example, a simple iterative server is implemented using the os_tcp_acceptor class. The example consists of two parts. A server (acceptor1s.cpp) and the client (acceptor1c.cpp). The server must be executed before the client. On the server side, the os_echo_acceptor object accepts connection requests on port 7001. The main program loops calling the acceptors handle_accept() member function to accept and serve incoming connections. In the handle_accept() method, the acceptor object creates an os_echo_server service handler object, waits for a connection request, accepts it and later executes the echo server handler object to service the established connections. The echo server handler object is executed in the acceptor's thread of control using the os_same_thread_executor concurrency strategy. The echo server handler object in its void* run() method reads a message on its socket stream and echoes it back before exiting. The echo server object has to delete itself after servicing since it is created by the acceptor on the heap. On the client side, two os_echo_client handler objects are created and are connected to the server synchronously using the os_tcp_connector object. Upon connection establishment, the TCP connector object executes the handlers in its own thread of control using the os_same_thread_executor concurrency strategy.

Example <ospace/framework/examples/acceptor1s.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_server : public os_handler_stream
  {
  public:
    void* run();
  };


void*
os_echo_server::run()
  {
  try
    {
    os_bstream stream( socket_ );
    string message;
    stream >> message;
    cout << "Server received: " << message << endl;
    cout << "Server sends: " << message << endl;
    stream << message;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  delete this; // Created by the acceptor.
  return 0;
  }



typedef os_tcp_acceptor
  <
  os_echo_server,
  os_same_thread_executor,
  os_creator< os_echo_server >
  >
  os_echo_acceptor;



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Run in the same thread as the server.
    os_same_thread_executor executor;
    os_echo_acceptor server( os_socket_address( 7001 ), &executor );

    cout << "Starting the echo server, type Ctrl-C to exit" << endl;
    while ( true )
      server.handle_accept();
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Starting the echo server, type Ctrl-C to exit
Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye
^C

Example <ospace/framework/examples/acceptor1c.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_client : public os_handler_stream
  {
  public:
    os_echo_client( const os_string& name, const os_string& message );
    void* run();

  private:
    string name_;
    string message_;
  };


os_echo_client::os_echo_client
  (
  const string& name,
  const string& message
  ) :
  name_( name ),
  message_( message )
  {
  }


void*
os_echo_client::run()
  {
  try
    {
    cout << name_ << " sends: " << message_ << endl;
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    stream << message_;
    string response;
    stream >> response;
    cout << name_ << " receives: " << response << endl;
    delete this;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  return 0;
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Create clients.
    os_echo_client* client1 = new os_echo_client( "Client1", "hello" );
    os_echo_client* client2 = new os_echo_client( "Client2", "goodbye" );

    // Connect synchronously.
    os_tcp_connector connector;
    os_same_thread_executor executor;
    connector.connect( client1, os_socket_address(7001), &executor );
    connector.connect( client2, os_socket_address(7001), &executor );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Client1 sends: hello
Client1 receives: hello
Client2 sends: goodbye
Client2 receives: goodbye

The following example implements a simple concurrent server. It is same as the previous example, except that the os_new_thread_executor concurrency strategy is used instead of the os_same_thread_executor concurrency strategy as in the previous example. The os_echo_server handlers on the server side and the os_echo_client handlers on the client side execute in their own thread of control. Externalizing the concurrency strategy makes it very simple to change the execution strategies from a simple iterative to several different concurrency approaches such as separate threads, thread pools, separate process, process pools, etc.

Example <ospace/framework/examples/acceptor2s.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_server : public os_handler_stream
  {
  public:
    void* run();
  };


void*
os_echo_server::run()
  {
  try
    {
    os_bstream stream( socket_ );
    string message;
    stream >> message;
    cout << "Server received: " << message << endl;
    cout << "Server sends: " << message << endl;
    stream << message;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  delete this; // Created by the acceptor.
  return 0;
  }



typedef os_tcp_acceptor
  <
  os_echo_server,
  os_new_thread_executor,
  os_creator< os_echo_server >
  >
 os_echo_acceptor;



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Run each handler in a separate thread.
    os_new_thread_executor executor;
    os_echo_acceptor server( os_socket_address( 7002 ), &executor );

    cout << "Starting the echo server, type Ctrl-C to exit" << endl;
    while ( true )
      server.handle_accept();
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Starting the echo server, type Ctrl-C to exit
Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye
^C

Example <ospace/framework/examples/acceptor2c.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_echo_client : public os_handler_stream
  {
  public:
    os_echo_client( const os_string& name, const os_string& message );
    void* run();

  private:
    string name_;
    string message_;
  };


os_echo_client::os_echo_client
  (
  const string& name,
  const string& message
  ) :
  name_( name ),
  message_( message )
  {
  }


void*
os_echo_client::run()
  {
  try
    {
    cout << name_ << " sends: " << message_ << endl;
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    stream << message_;
    string response;
    stream >> response;
    cout << name_ << " receives: " << response << endl;
    delete this;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }
  return 0;
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Create clients.
    os_echo_client* client1 = new os_echo_client( "Client1", "hello" );
    os_echo_client* client2 = new os_echo_client( "Client2", "goodbye" );

    // Connect synchronously.
    os_tcp_connector connector;
    os_new_thread_executor executor;
    connector.connect( client1, os_socket_address(7002), &executor );
    connector.connect( client2, os_socket_address(7002), &executor );

    os_this_thread::sleep( 5 );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Client1 sends: hello
Client2 sends: goodbye
Client1 receives: hello
Client2 receives: goodbye

The following example illustrates defining:

The example uses several acronym server and acronym client objects. On the server side (acceptor4s.cpp) in the main() function, an os_acronym_acceptor object is created bounded to port 7002. Later, two other acronym acceptor objects are created using the server socket descriptor of the first acronym acceptor. So, there are three acronym acceptor objects monitoring the same socket for incoming connections. The three acceptors are executed in three different threads. Each thread will execute the acceptor's void* run() method to accept and service connections. Since more than on thread will be calling accept()system call at the same time on the same server socket descriptor, the accept()system call must be serialized for performance reasons and for portability across different platforms. Hence, the acceptor objects are constructed with a lock accept call set to true (fifth argument of the constructor). After accepting connections, the acceptor objects execute the acronym server handler objects using the same thread pool executor object. On the client side (acceptor4c.cpp), several acronym client objects are created and enqueued to a thread pool. Each acronym client when executed will connect to the server with port number 7002 and a get a value for a particular acronym.

Example <ospace/framework/examples/acceptor4s.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

typedef map< string, string, less< string > >  map_string;


class os_acronym_server : public os_handler_stream
  {
  public:
    os_acronym_server( const map_string& elements );
    void* run();

  private:
    map_string elements_;
  };


os_acronym_server::os_acronym_server( const map_string& elements ) :
  elements_( elements )
  {
  }


void*
os_acronym_server::run()
  {
  try
    {
    os_bstream stream( socket_ );
    string acronym;
    stream >> acronym;
    cout << "Server received: " << acronym << endl;
    cout << "Server sends: " << elements_[ acronym ] << endl;
    stream << elements_[ acronym ];
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  delete this; // Created by the acceptor.
  return 0;
  }



class os_acronym_server_creator
  {
  public:
    os_acronym_server_creator( const map_string* elements = 0 );
    os_acronym_server* operator()() const;

  private:
    const map_string* elements_;
  };


os_acronym_server_creator::os_acronym_server_creator
  (
  const map_string* elements
  ) :
  elements_( elements )
  {
  }


os_acronym_server*
os_acronym_server_creator::operator()() const
  {
  return new os_acronym_server( *elements_ );
  }



typedef os_tcp_acceptor
  <
  os_acronym_server,
  os_thread_pool_executor,
  os_acronym_server_creator
  >
  os_acronym_acceptor;



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    map_string acronyms;
    acronyms[ "ORB" ] = "Object Request Broker";
    acronyms[ "FAQ" ] = "Frequently Asked Questions";
    acronyms[ "OMG" ] = "Object Management Group";

    os_acronym_server_creator creator( &acronyms );
    os_thread_pool_executor thread_pool;

    // Create multiple servers listening on the same port no.
    os_acronym_acceptor server1
      (
      os_socket_address( 7002 ),
      &thread_pool,
      0,   // No dispatcher.
      creator,
      true // Lock accept call.
      );

    os_acronym_acceptor server2
      (
      server1.descriptor(),
      &thread_pool,
      0,   // No dispatcher.
      creator,
      true // Lock accept call.
      );

    os_acronym_acceptor server3
      (
      server1.descriptor(),
      &thread_pool,
      0,   // No dispatcher.
      creator,
      true // Lock accept call.
      );


    cout << "Starting the acronym servers, type Ctrl-C to exit" << endl;

    os_thread thread1( &server1 );
    os_thread thread2( &server2 );
    os_thread thread3( &server3 );

    os_this_thread::wait_for_any_thread();
    thread_pool.wait_for_completion();
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Starting the acronym servers, type Ctrl-C to exit
Server received: FAQ
Server sends: Frequently Asked Questions
Server received: OMG
Server sends: Object Management Group
Server received: FAQ
Server sends: Frequently Asked Questions
Server received: ORB
Server sends: Object Request Broker
Server received: OMG
Server sends: Object Management Group
Server received: FAQ
Server sends: Frequently Asked Questions
^C

Example <ospace/framework/examples/acceptor4c.cpp>
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>

class os_acronym_client
  {
  public:
    os_acronym_client
      (
      int port,
      const string& name,
      const string& acronym
      );

    void* run();

  private:
    int port_;
    string name_;
    string acronym_;
    os_tcp_socket socket_;
  };


os_acronym_client::os_acronym_client
  (
  int port,
  const string& name,
  const string& acronym
  ) :
  port_( port ),
  name_( name ),
  acronym_( acronym )
  {
  }


void*
os_acronym_client::run()
  {
  try
    {
    socket_.connect_to( os_socket_address( port_ ) );
    cout << name_ << " sends: " << acronym_  << endl;
    os_bstream stream( socket_ ); // Create a binary stream on the socket.
    stream <<  acronym_;
    os_string response;
    stream >> response;
    cout << name_ << " receives: " << response << endl;
    delete this;
    }
  catch( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    delete this;
    }
  return 0;
  }



int
main()
  {
  os_framework_toolkit init_framework;
  os_streaming_toolkit init_streaming;

  try
    {
    // Create a thread pool with 3 threads.
    os_thread_pool tpool( 3, 3 );
    tpool.execute( new os_acronym_client( 7002, "Client1", "FAQ" ) );
    tpool.execute( new os_acronym_client( 7002, "Client2", "OMG" ) );
    tpool.execute( new os_acronym_client( 7002, "Client3", "FAQ" ) );
    tpool.execute( new os_acronym_client( 7002, "Client4", "ORB" ) );
    tpool.execute( new os_acronym_client( 7002, "Client5", "OMG" ) );
    tpool.execute( new os_acronym_client( 7002, "Client6", "FAQ" ) );

    while ( tpool.pending_tasks() != 0 )
      os_this_thread::sleep( 5 );
    }
  catch ( os_toolkit_error& error )
    {
    cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
    }

  return 0;
  }

Client1 sends: FAQ
Client1 receives: Frequently Asked Questions
Client2 sends: OMG
Client2 receives: Object Management Group
Client3 sends: FAQ
Client3 receives: Frequently Asked Questions
Client4 sends: ORB
Client4 receives: Object Request Broker
Client5 sends: OMG
Client5 receives: Object Management Group
Client6 sends: FAQ
Client6 receives: Frequently Asked Questions

Copyright©1994-2026 Recursion Software LLC
All Rights Reserved - For use by licensed users only.