tcp_acceptor, tcp_connector |
The Acceptor-Connector pattern decouples the responsibility of connection establishment and service initialization in a network application from the processing that the application is designed to perform. The benefit is a more flexible, more reusable application resulting in less maintenance.
The os_tcp_acceptor template
class extends the os_tcp_connection_server class.
The os_tcp_connection_server class allows accepting
incoming connections on a particular address. The os_tcp_acceptor
extends it by providing functionality for creating and executing service
handlers to serve the accepted connections. The type of the service handler, its
creation, and its execution strategies can be specified as template
instantiation parameters to the os_tcp_acceptor
class.
There are three template parameters. The first parameter, ServiceHandler
specifies the service handler object type. A service handler object contains a void*
run() method that implements the functionality to service the client
after accepting the connection. The second parameter, Executor
specifies the concurrency strategy to use to execute the ServiceHandler
's void* run() method after the connection is
accepted. Refer to the classes os_same_thread_executor
, os_new_thread_executor , and os_thread_pool_executor
for more information on the concurrency strategies. Apart from these, users can
define and use their own concurrency strategy classes like separate process,
process pool as an Executor class. The third
parameter Creator is a function object which is
used to create the ServiceHandler object. The
default Creator is os_creator
which creates a default constructed ServiceHandler
object on the heap and returns it. The ServiceHandler
object should delete itself after it is done serving the client.
Connection requests are handled in the handle_accept()
method. A ServiceHandler object is created using
the Creator function object. The created ServiceHandler
object should contain an os_tcp_socket object
member which is used to establish the connection with the client socket. A ServiceHandler
object could inherit from os_handler_stream class
for the os_tcp_socket object or alternatively
implement the functionality of os_handler_stream
class directly in its own class without inheriting from it. After the connection
is established the ServiceHandler object is passed
to the Executor object for execution. The Executor
object calls the void* run() method of the ServiceHandler
to initiate service handling.
The application can wait for the connection requests using
the handle_accept() method or through the os_dispatcher
instance. A single thread application could use os_dispatcher
class and a multithread application could use a separate thread that repeated
calls handle_accept() to accept connections and
initiate service handlers.
The os_tcp_connector class
implements the strategy for initiating a connection and executing a connect
handler when the connection is established. The connection can be initiated
synchronously or asynchronously. The class os_dispatcher
is used for asynchronous connection completion notifications.
The connect() method is used to
connect to a server socket with a particular address. If the connection mode is os_tcp_connector::sync
, the calling thread blocks until the connection is established. If the
connection mode is os_tcp_connector::async the
calling thread will not block if the connection cannot be made immediately and
an internal connection completion handler is registered with the dispatcher
which is notified when the connection competes.
The connect() method is a
member template function that takes two template arguments. The first template
argument ConnectHandler specifies the type of the
service handler object that is executed upon connection establishment. For
establishing the connection, it should contain a os_tcp_socket
socket object member. This can be obtained by inheriting the ConnectHandler
class object from the os_handler_stream class.
Alternatively, the os_handler_stream functionality
can be implemented directly in the ConnectHandler
class without inheriting from os_handler_stream
class. The second template argument Executor
specifies the type of the concurrency strategy object used to execute the ConnectHandler
object's void* run() method after the connection is
established. Refer to the classes os_same_thread_executor
, os_new_thread_executor , and os_thread_pool_executor
for more information on the concurrency strategies. Apart from these, users can
define and use their own concurrency strategy classes like separate process,
process pool as the executor class. The void* run()
method of the ConnectHandler should implement the
service offered/requested once the connection is established.
The os_tcp_acceptor , os_tcp_connector
, and os_dispatcher classes can be collectively
used to synchronously or asynchronously establish connections and perform
service processing at the two connection endpoints.
Use of member template feature gives the flexibility of not having the various classes to inherit from a specific interface class.
In the following example, a simple iterative server is
implemented using the os_tcp_acceptor class. The
example consists of two parts. A server (acceptor1s.cpp) and the client
(acceptor1c.cpp). The server must be executed before the client. On the server
side, the os_echo_acceptor object accepts
connection requests on port 7001. The main program loops calling the acceptors handle_accept()
member function to accept and serve incoming connections. In the handle_accept()
method, the acceptor object creates an os_echo_server
service handler object, waits for a connection request, accepts it and later
executes the echo server handler object to service the established connections.
The echo server handler object is executed in the acceptor's thread of control
using the os_same_thread_executor concurrency
strategy. The echo server handler object in its void*
run() method reads a message on its socket stream and echoes it back
before exiting. The echo server object has to delete itself after servicing
since it is created by the acceptor on the heap. On the client side, two os_echo_client
handler objects are created and are connected to the server synchronously using
the os_tcp_connector object. Upon connection
establishment, the TCP connector object executes the handlers in its own thread
of control using the os_same_thread_executor
concurrency strategy.
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>
class os_echo_server : public os_handler_stream
{
public:
void* run();
};
void*
os_echo_server::run()
{
try
{
os_bstream stream( socket_ );
string message;
stream >> message;
cout << "Server received: " << message << endl;
cout << "Server sends: " << message << endl;
stream << message;
}
catch( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
delete this; // Created by the acceptor.
return 0;
}
typedef os_tcp_acceptor
<
os_echo_server,
os_same_thread_executor,
os_creator< os_echo_server >
>
os_echo_acceptor;
int
main()
{
os_framework_toolkit init_framework;
os_streaming_toolkit init_streaming;
try
{
// Run in the same thread as the server.
os_same_thread_executor executor;
os_echo_acceptor server( os_socket_address( 7001 ), &executor );
cout << "Starting the echo server, type Ctrl-C to exit" << endl;
while ( true )
server.handle_accept();
}
catch ( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
Starting the echo server, type Ctrl-C to exit
Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye
^C
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>
class os_echo_client : public os_handler_stream
{
public:
os_echo_client( const os_string& name, const os_string& message );
void* run();
private:
string name_;
string message_;
};
os_echo_client::os_echo_client
(
const string& name,
const string& message
) :
name_( name ),
message_( message )
{
}
void*
os_echo_client::run()
{
try
{
cout << name_ << " sends: " << message_ << endl;
os_bstream stream( socket_ ); // Create a binary stream on the socket.
stream << message_;
string response;
stream >> response;
cout << name_ << " receives: " << response << endl;
delete this;
}
catch( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
int
main()
{
os_framework_toolkit init_framework;
os_streaming_toolkit init_streaming;
try
{
// Create clients.
os_echo_client* client1 = new os_echo_client( "Client1", "hello" );
os_echo_client* client2 = new os_echo_client( "Client2", "goodbye" );
// Connect synchronously.
os_tcp_connector connector;
os_same_thread_executor executor;
connector.connect( client1, os_socket_address(7001), &executor );
connector.connect( client2, os_socket_address(7001), &executor );
}
catch ( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
Client1 sends: hello
Client1 receives: hello
Client2 sends: goodbye
Client2 receives: goodbye
The following example implements a simple concurrent server.
It is same as the previous example, except that the os_new_thread_executor
concurrency strategy is used instead of the os_same_thread_executor
concurrency strategy as in the previous example. The os_echo_server
handlers on the server side and the os_echo_client
handlers on the client side execute in their own thread of control.
Externalizing the concurrency strategy makes it very simple to change the
execution strategies from a simple iterative to several different concurrency
approaches such as separate threads, thread pools, separate process, process
pools, etc.
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>
class os_echo_server : public os_handler_stream
{
public:
void* run();
};
void*
os_echo_server::run()
{
try
{
os_bstream stream( socket_ );
string message;
stream >> message;
cout << "Server received: " << message << endl;
cout << "Server sends: " << message << endl;
stream << message;
}
catch( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
delete this; // Created by the acceptor.
return 0;
}
typedef os_tcp_acceptor
<
os_echo_server,
os_new_thread_executor,
os_creator< os_echo_server >
>
os_echo_acceptor;
int
main()
{
os_framework_toolkit init_framework;
os_streaming_toolkit init_streaming;
try
{
// Run each handler in a separate thread.
os_new_thread_executor executor;
os_echo_acceptor server( os_socket_address( 7002 ), &executor );
cout << "Starting the echo server, type Ctrl-C to exit" << endl;
while ( true )
server.handle_accept();
}
catch ( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
Starting the echo server, type Ctrl-C to exit
Server received: hello
Server sends: hello
Server received: goodbye
Server sends: goodbye
^C
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>
class os_echo_client : public os_handler_stream
{
public:
os_echo_client( const os_string& name, const os_string& message );
void* run();
private:
string name_;
string message_;
};
os_echo_client::os_echo_client
(
const string& name,
const string& message
) :
name_( name ),
message_( message )
{
}
void*
os_echo_client::run()
{
try
{
cout << name_ << " sends: " << message_ << endl;
os_bstream stream( socket_ ); // Create a binary stream on the socket.
stream << message_;
string response;
stream >> response;
cout << name_ << " receives: " << response << endl;
delete this;
}
catch( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
int
main()
{
os_framework_toolkit init_framework;
os_streaming_toolkit init_streaming;
try
{
// Create clients.
os_echo_client* client1 = new os_echo_client( "Client1", "hello" );
os_echo_client* client2 = new os_echo_client( "Client2", "goodbye" );
// Connect synchronously.
os_tcp_connector connector;
os_new_thread_executor executor;
connector.connect( client1, os_socket_address(7002), &executor );
connector.connect( client2, os_socket_address(7002), &executor );
os_this_thread::sleep( 5 );
}
catch ( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
Client1 sends: hello
Client2 sends: goodbye
Client1 receives: hello
Client2 receives: goodbye
The following example illustrates defining:
The example uses several acronym server and acronym client
objects. On the server side (acceptor4s.cpp) in the main() function, an os_acronym_acceptor
object is created bounded to port 7002. Later, two other acronym acceptor
objects are created using the server socket descriptor of the first acronym
acceptor. So, there are three acronym acceptor objects monitoring the same
socket for incoming connections. The three acceptors are executed in three
different threads. Each thread will execute the acceptor's void*
run() method to accept and service connections. Since more than on thread
will be calling accept()system call at the same
time on the same server socket descriptor, the accept()system
call must be serialized for performance reasons and for portability across
different platforms. Hence, the acceptor objects are constructed with a lock
accept call set to true (fifth argument of the constructor). After accepting
connections, the acceptor objects execute the acronym server handler objects
using the same thread pool executor object. On the client side (acceptor4c.cpp),
several acronym client objects are created and enqueued to a thread pool. Each
acronym client when executed will connect to the server with port number 7002
and a get a value for a particular acronym.
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>
typedef map< string, string, less< string > > map_string;
class os_acronym_server : public os_handler_stream
{
public:
os_acronym_server( const map_string& elements );
void* run();
private:
map_string elements_;
};
os_acronym_server::os_acronym_server( const map_string& elements ) :
elements_( elements )
{
}
void*
os_acronym_server::run()
{
try
{
os_bstream stream( socket_ );
string acronym;
stream >> acronym;
cout << "Server received: " << acronym << endl;
cout << "Server sends: " << elements_[ acronym ] << endl;
stream << elements_[ acronym ];
}
catch( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
delete this; // Created by the acceptor.
return 0;
}
class os_acronym_server_creator
{
public:
os_acronym_server_creator( const map_string* elements = 0 );
os_acronym_server* operator()() const;
private:
const map_string* elements_;
};
os_acronym_server_creator::os_acronym_server_creator
(
const map_string* elements
) :
elements_( elements )
{
}
os_acronym_server*
os_acronym_server_creator::operator()() const
{
return new os_acronym_server( *elements_ );
}
typedef os_tcp_acceptor
<
os_acronym_server,
os_thread_pool_executor,
os_acronym_server_creator
>
os_acronym_acceptor;
int
main()
{
os_framework_toolkit init_framework;
os_streaming_toolkit init_streaming;
try
{
map_string acronyms;
acronyms[ "ORB" ] = "Object Request Broker";
acronyms[ "FAQ" ] = "Frequently Asked Questions";
acronyms[ "OMG" ] = "Object Management Group";
os_acronym_server_creator creator( &acronyms );
os_thread_pool_executor thread_pool;
// Create multiple servers listening on the same port no.
os_acronym_acceptor server1
(
os_socket_address( 7002 ),
&thread_pool,
0, // No dispatcher.
creator,
true // Lock accept call.
);
os_acronym_acceptor server2
(
server1.descriptor(),
&thread_pool,
0, // No dispatcher.
creator,
true // Lock accept call.
);
os_acronym_acceptor server3
(
server1.descriptor(),
&thread_pool,
0, // No dispatcher.
creator,
true // Lock accept call.
);
cout << "Starting the acronym servers, type Ctrl-C to exit" << endl;
os_thread thread1( &server1 );
os_thread thread2( &server2 );
os_thread thread3( &server3 );
os_this_thread::wait_for_any_thread();
thread_pool.wait_for_completion();
}
catch ( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
Starting the acronym servers, type Ctrl-C to exit
Server received: FAQ
Server sends: Frequently Asked Questions
Server received: OMG
Server sends: Object Management Group
Server received: FAQ
Server sends: Frequently Asked Questions
Server received: ORB
Server sends: Object Request Broker
Server received: OMG
Server sends: Object Management Group
Server received: FAQ
Server sends: Frequently Asked Questions
^C
#include <iostream>
#include <string>
#include <ospace/framework.h>
#include <ospace/thread.h>
#include <ospace/stream.h>
#include <ospace/uss/std/string.h>
class os_acronym_client
{
public:
os_acronym_client
(
int port,
const string& name,
const string& acronym
);
void* run();
private:
int port_;
string name_;
string acronym_;
os_tcp_socket socket_;
};
os_acronym_client::os_acronym_client
(
int port,
const string& name,
const string& acronym
) :
port_( port ),
name_( name ),
acronym_( acronym )
{
}
void*
os_acronym_client::run()
{
try
{
socket_.connect_to( os_socket_address( port_ ) );
cout << name_ << " sends: " << acronym_ << endl;
os_bstream stream( socket_ ); // Create a binary stream on the socket.
stream << acronym_;
os_string response;
stream >> response;
cout << name_ << " receives: " << response << endl;
delete this;
}
catch( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
delete this;
}
return 0;
}
int
main()
{
os_framework_toolkit init_framework;
os_streaming_toolkit init_streaming;
try
{
// Create a thread pool with 3 threads.
os_thread_pool tpool( 3, 3 );
tpool.execute( new os_acronym_client( 7002, "Client1", "FAQ" ) );
tpool.execute( new os_acronym_client( 7002, "Client2", "OMG" ) );
tpool.execute( new os_acronym_client( 7002, "Client3", "FAQ" ) );
tpool.execute( new os_acronym_client( 7002, "Client4", "ORB" ) );
tpool.execute( new os_acronym_client( 7002, "Client5", "OMG" ) );
tpool.execute( new os_acronym_client( 7002, "Client6", "FAQ" ) );
while ( tpool.pending_tasks() != 0 )
os_this_thread::sleep( 5 );
}
catch ( os_toolkit_error& error )
{
cerr << "\nCaught os_toolkit_error: \n" << '\t' << error << endl;
}
return 0;
}
Client1 sends: FAQ
Client1 receives: Frequently Asked Questions
Client2 sends: OMG
Client2 receives: Object Management Group
Client3 sends: FAQ
Client3 receives: Frequently Asked Questions
Client4 sends: ORB
Client4 receives: Object Request Broker
Client5 sends: OMG
Client5 receives: Object Management Group
Client6 sends: FAQ
Client6 receives: Frequently Asked Questions
Copyright©1994-2026 Recursion
Software LLC
All Rights Reserved - For use by licensed users only.