io_multiplexer |
Network<ToolKit> includes a
class called os_io_multiplexer for multiplexing
socket operations on multiple ports. The three possible types of interesting
condition that it can wait for are listed below.
An os_io_multiplexer
contains a set of socket descriptors that are considered for each condition
type. To set or access the descriptors that are scanned for read, write, and
exception conditions, use read_scan() ,
write_scan() , and exception_scan()
.
When you execute select()
with an optional time-out parameter, select()
blocks the process until at least one interesting condition is detected or a
time-out occurs. select() then returns the number
of descriptors with a pending condition. To determine which descriptors are
associated with the detected conditions, use readable()
, writable()
, and exceptional()
.
The following example spawns a
thread to run server_func() , which creates a
collection of connection servers and an I/O multiplexer interested in the
readable condition. This I/O multiplexer detects connection requests on any of
the registered connection ports. If no connection requests are received after a
second, the server prints a message, and then loops and restarts the select()
.
The example then creates three
additional threads a few seconds apart: one thread to run client_func1()
and the remaining threads to run client_func2() .
Each client connects to a different port on the server. The server uses select()
to detect the connection request and spawns a child process to service the
connection. After spawning a child for each pending connection request, the
server reenters the select() and waits for the next
client. After both client threads complete, the example terminates the server
and exits.
#include <iostream>
#include <string>
#include <ospace/network.h>
#include <ospace/stream.h>
#include <ospace/thread.h>
#include <ospace/uss/std/string.h>
int
tcp_bounce( os_tcp_socket& socket, const char* name, const char* str )
{
cout << name << " sends: " << str << endl;
os_bstream stream( socket ); // Create a binary stream on the socket.
stream << string( str ); // Send a string.
string strng;
stream >> strng; // Read the response.
cout << name << " receives: " << strng << endl;
return 0;
}
void*
client_func1( void* )
{
os_tcp_socket socket;
socket.connect_to( os_socket_address( 7001 ) );
return (void*) tcp_bounce( socket, "Client1", "hello" );
}
void*
client_func2( void* )
{
os_tcp_socket socket;
socket.connect_to( os_socket_address( 7003 ) );
return (void*) tcp_bounce( socket, "Client2", "goodbye" );
}
void*
transact( void* arg )
{
os_tcp_socket* socket = (os_tcp_socket*) arg;
os_bstream stream( *socket );
string str;
stream >> str;
cout << "Server received: " << str << endl;
cout << "Server sends: " << str << endl;
stream << str;
delete socket;
return 0;
}
void
serve( vector< os_sock_t >& servers, os_io_multiplexer& mux )
{
// Obtain vector of readable descriptors.
vector< os_sock_t > readable = mux.readable();
vector< os_sock_t >::iterator iter;
cout << "Server detects " << readable.size()
<< " pending requests." << endl;
// Iterate through all readable descriptors.
for ( iter = readable.begin(); iter != readable.end(); iter++ )
{
// Service connection.
os_tcp_connection_server server( *iter );
server.auto_close( false );
cout << "Server processing request on port ";
cout << server.socket_address().port() << endl;
os_tcp_socket* socket = new os_tcp_socket;
if ( server.accept( *socket ) )
{
os_thread dispatch( transact, socket );
}
}
}
void* server_func( void* )
{
os_tcp_connection_server server1( os_socket_address( 7001 ) );
os_tcp_connection_server server2( os_socket_address( 7002 ) );
os_tcp_connection_server server3( os_socket_address( 7003 ) );
// Build collection of all connection server descriptors.
vector< os_sock_t > servers;
servers.push_back( server1.descriptor() );
servers.push_back( server2.descriptor() );
servers.push_back( server3.descriptor() );
// Build an I/O multiplexer to scan the descriptors.
os_io_multiplexer mux;
mux.read_scan( servers );
// Loop, servicing a connection on any connection server.
while( true )
{
int count = mux.select( 1 ); // Demonstrate timeout capability.
if (count == 0)
cout << "Time out" << endl;
else
serve( servers, mux );
}
cout << "Server shutting down." << endl;
return 0;
}
void
main()
{
os_network_toolkit init_network;
os_streaming_toolkit init_streaming;
os_thread_toolkit init_thread;
os_thread server( server_func );
os_thread client1( client_func1 );
os_this_thread::sleep( 3 ); // To force a mux timeout.
os_thread client2( client_func2 ); // Spawn another client process.
os_thread client3( client_func2 ); // Spawn another client process.
os_this_thread::wait_for_thread( client1 );
os_this_thread::wait_for_thread( client2 );
os_this_thread::wait_for_thread( client3 );
cout << "Both threads are done, terminating the server." << endl;
}
Client1 sends: hello
Server detects 1 pending requests.
Server processing request on port 7001
Server received: hello
Server sends: hello
Client1 receives: hello
Time out
Time out
Server detects 1 pending requests.
Server processing request on port 7003
Client2 sends: goodbye
Server detects 1 pending requests.
Server processing request on port 7003
Client2 sends: goodbye
Server received: goodbye
Server sends: goodbye
Server received: goodbye
Server sends: goodbye
Client2 receives: goodbye
Client2 receives: goodbye
Both threads are done, terminating the server.
Copyright©1994-2026 Recursion
Software LLC
All Rights Reserved - For use by licensed users only.