summaryrefslogtreecommitdiff
path: root/tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp
diff options
context:
space:
mode:
authorwchiang <wchiang@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-24 10:40:56 +0000
committerwchiang <wchiang@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-24 10:40:56 +0000
commite22e3f9f37205e8fcf21e13d7ce338b03f97205a (patch)
tree022a3c1af08af17df883476c3486744c9f14e21b /tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp
parent49b1b7f6f5852151db99b8bcf76a5cc3458005e4 (diff)
downloadATCD-e22e3f9f37205e8fcf21e13d7ce338b03f97205a.tar.gz
*** empty log message ***
Diffstat (limited to 'tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp')
-rw-r--r--tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp392
1 files changed, 392 insertions, 0 deletions
diff --git a/tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp b/tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp
new file mode 100644
index 00000000000..c100b768caf
--- /dev/null
+++ b/tests/CLASSIX/CLASSIX_CLD_Connector_Test.cpp
@@ -0,0 +1,392 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// CLD_Connector.cpp
+//
+// = DESCRIPTION
+// Based on $ACE_ROOT/tests/MT_SOCK.cpp
+//
+// This is a multi-threaded torture test of the ACE_CLASSIX_CLD_Connector
+// class.
+//
+// The test spawns a server and multiple clients allowing clients to
+// exchange data with the server.
+//
+// This example demonstrates the following use cases
+// - server is a subclass of ACE_Svc_Handler,
+// but it does not involve an acceptor class.
+// - server multicasts message to the clients
+// - client uses ACE_CLASSIX_Connector to "connect" to the server
+//
+// ============================================================================
+
+#include "ace/OS.h"
+#include "ace/Thread.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Handle_Set.h"
+#include "ace/Svc_Handler.h"
+
+#include "ace/CLASSIX/CLASSIX_Select_Reactor.h"
+#include "ace/CLASSIX/CLASSIX_Stream.h"
+#include "ace/CLASSIX/CLASSIX_Dgram_Mcast.h"
+#include "ace/CLASSIX/CLASSIX_Group_Stamp.h"
+#include "ace/CLASSIX/CLASSIX_CLD_Connector.h"
+
+#include "CLASSIX_test_config.h"
+
+#define MAX_TEST_CLIENTS 30
+#define TEST_STAMP 300
+#define ACE_CLASSIX_MCAST_DGRAM ACE_CLASSIX_Dgram_Mcast, ACE_CLASSIX_Group
+
+static ACE_Atomic_Op<ACE_Thread_Mutex, u_int> client_id = 0;
+
+
+struct client_arg
+{
+ ACE_Barrier *wait;
+ ACE_CLASSIX_Port_Core *server;
+};
+
+struct client_data
+{
+ u_int me;
+ char c;
+};
+
+class server_handler : public ACE_Svc_Handler<ACE_CLASSIX_MCAST_DGRAM,
+ ACE_MT_SYNCH>
+{
+public:
+ server_handler(int /* stamp */, const ACE_CLASSIX_Port_Core&);
+ // port that the server uses to receive data from all sorts of clients
+ virtual int open (void * = 0 /* args */);
+ virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
+ virtual int svc (void);
+
+ // = Demultiplexing hooks for Reactor
+ virtual int handle_input (ACE_HANDLE);
+ // The first int of each message identifies the sender
+
+private:
+ ACE_Atomic_Op<ACE_Thread_Mutex, u_int> in_svc_;
+ // 1 if svc() is running
+};
+
+server_handler::server_handler(int theStamp,
+ const ACE_CLASSIX_Port_Core & thePort)
+ : ACE_Svc_Handler<ACE_CLASSIX_MCAST_DGRAM, ACE_MT_SYNCH> (),
+ in_svc_ (0)
+{
+ if (this->peer().set_saps(theStamp, thePort) != 0)
+ ACE_DEBUG((LM_DEBUG, "failed to set up IO stream \n"));
+}
+
+int
+server_handler::open (void *)
+{
+ ACE_DEBUG((LM_DEBUG, "server_handler::open()\n"));
+ if (reactor ()->register_handler (this, READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t)server_handler:: cannot register handler\n"),
+ -1);
+
+ if (this->peer().selectable() != 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) failed to make Server's port %d selectable\n",
+ get_handle ()));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%t) created svc_handler for handle %d\n",
+ get_handle ()));
+
+ if (this->peer().control(K_BROADMODE) != 0)
+ ACE_DEBUG((LM_DEBUG, "(%t) %p\n"
+ "server cannot send data to destination\n"));
+
+ return this->activate (THR_BOUND);
+}
+
+int
+server_handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ while (this->in_svc_ == 1)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) server is closing down\n"));
+
+ // Use a blank message to tell the thread to stop
+ ACE_Message_Block *mb = new ACE_Message_Block((size_t) 0);
+ ACE_Time_Value timeout(0, 10*1000); // wait for at most 10 usec
+ this->putq(mb, &timeout);
+ ACE_OS::thr_yield();
+ }
+
+ this->destroy();
+}
+
+int
+server_handler::handle_input(ACE_HANDLE)
+{
+ // Get input as fast as it can to free the reactor to handle other work.
+ size_t n = 0;
+ if (ACE_Reactor::instance()->
+ current_info(this->get_handle(),n) == -1)
+ ACE_ERROR_RETURN((LM_ERROR, "???(%t) failed to get input size\n"), -1);
+
+ ACE_Message_Block *msg;
+ ACE_NEW_RETURN(msg, ACE_Message_Block(n == 0 ? 1 : n), -1);
+ if (this->peer().recv(msg->wr_ptr(), n) == -1)
+ ACE_ERROR_RETURN((LM_ERROR, "???(%t) %p\n", "get_data()"), -1);
+ msg->wr_ptr(n);
+
+
+ if (this->putq(msg) == -1)
+ ACE_ERROR_RETURN((LM_ERROR, "???(%t) failed to enqueue message\n"),
+ -1);
+ return 0;
+}
+
+int
+server_handler::svc(void)
+{
+ this->peer().open_writer();
+ this->in_svc_ = 1;
+
+ ACE_Message_Block *mb = 0;
+ int result = 0;
+ char *storage[MAX_TEST_CLIENTS];
+
+ // initialize the expected result per client
+ for (int i = 0; i < MAX_TEST_CLIENTS; i++)
+ storage[i] = ACE_ALPHABET;
+
+ int len = sizeof (client_data);
+ int total_clients = MAX_TEST_CLIENTS;
+
+ ACE_DEBUG((LM_DEBUG, "(%t)server is waiting for clients\n"));
+ // read input
+ for (;;)
+ {
+ // wait until a message has arrived
+ result = this->getq (mb);
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN((LM_ERROR,
+ "???(%t), error while waiting for a message "
+ "on the queue\n"), 0);
+ }
+
+ int length = mb->length ();
+
+ u_int client = 0;
+ if (length != 0 && length == len)
+ {
+ client = *((u_int*) mb->rd_ptr ());
+ mb->rd_ptr(sizeof (u_int));
+ char* data = storage[client];
+ // Check if the client has done
+ if (*(mb->rd_ptr()) == '\0')
+ {
+ ACE_DEBUG((LM_DEBUG, "handshake with client %d\n", client));
+ client_data response;
+ response.me = client;
+ int r = this->peer().send_n(&response, len);
+ if (r != len)
+ ACE_ERROR((LM_ERROR,
+ "(%t):%d %p\n",r,
+ "server faided to send handshake msg"));
+ total_clients--;
+ if (total_clients == 0)
+ {
+ mb->release();
+ ACE_DEBUG((LM_DEBUG, "(%t) end event loop \n"));
+ ACE_Reactor::end_event_loop();
+ this->in_svc_ = 0;
+ break;
+ }
+ }
+ if (*data != *(mb->rd_ptr()))
+ ACE_ERROR((LM_ERROR, "???(%t), invalid input\n"));
+ storage[client] = ++data;
+ }
+ else if (length > 0)
+ {
+ ACE_ERROR((LM_ERROR, "???(%t), invalid input length(%d)\n",
+ length));
+ }
+ mb->release();
+
+ }
+
+ return 0;
+}
+
+
+static void *
+client (void *arg)
+{
+ client_data info;
+ info.me = client_id++;
+ int info_len = sizeof (client_data);
+
+ client_arg *data = (client_arg*) arg;
+ ACE_CLASSIX_Port_Core *server_port = data->server;
+ ACE_CLASSIX_Port server_addr(*server_port);
+ ACE_Barrier *barrier = data->wait;
+ //===================================================================
+ // Stream & Connector
+ //
+// ACE_CLASSIX_Stream cli_stream(*server_port);
+ ACE_CLASSIX_Stream cli_stream;
+ // create a stream where the local SAP uses the actor's default port.
+
+ ACE_CLASSIX_CLD_Connector con;
+ // create a connector for the stream
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Connecting local and peer SAPs\n"));
+ // Connect local and peer SAPs.
+
+ barrier->wait();
+ //===================================================================
+ // Attempt a connect to the server...
+ // A local port will be created as a local SAP
+ if (con.connect (cli_stream, server_addr) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) %p\n",
+ "connection failed"),
+ 0);
+ }
+ ACE_CLASSIX_Port client_addr;
+ if (cli_stream.local_sap().get_addr (client_addr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "get_local_addr"), 0);
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) connected client at %d\n",
+ client_addr.get_handle ()));
+
+
+ //===================================================================
+ // Insert the local SAP to the test group
+ ACE_CLASSIX_Group_Stamp group(TEST_STAMP);
+ // group that the client's port is in
+ if (group.insert(&client_addr) == -1)
+ ACE_ERROR_RETURN((LM_ERROR,
+ "Failed to insert local SAP of client %d in to the"
+ "group \n"), -1);
+
+ //===================================================================
+ // Do not use Reactor, so disable local port from being monitored
+ int result = cli_stream.unselectable();
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) failed to disable local port(%d)\n",
+ result), -1);
+
+ //===================================================================
+ // Send data to server (correctly handles "incomplete writes").
+ char *c = ACE_ALPHABET;
+
+ do
+ {
+ ACE_OS::thr_yield();
+ info.c = *c;
+ if (cli_stream.send_n (&info, info_len) != info_len)
+ ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n", "send_n"));
+ }while(*c++ != '\0');
+
+ //===================================================================
+ // Close writer
+ // ACE_DEBUG ((LM_DEBUG, "(%P|%t) closing writer\n"));
+ // Explicitly close the writer-side of the connection.
+ //if (cli_stream.close_writer () == -1)
+ // ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n", "close_writer"));
+
+ // Wait for handshake with server.
+ client_data response;
+ do
+ {
+ if (cli_stream.ipcRecv_n (&response, info_len) != info_len)
+ ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n", "recv_n"));
+ }
+ while (response.me != info.me);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) received handshake from server\n"));
+
+ // Close the connection completely.
+ if (cli_stream.close () == -1)
+ ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n", "close"));
+
+ return 0;
+}
+
+
+static void
+spawn (ACE_CLASSIX_Port_Core* theServer)
+{
+ // create a port for the server
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting server at port %d\n",
+ theServer->get_handle ()));
+
+ // activate server
+ server_handler handler(TEST_STAMP, *theServer);
+ handler.open(); // make the server active
+
+ // activate clients
+ // make sure
+ // - we don't let client send messages before the event loop is running
+ ACE_Barrier wait(MAX_TEST_CLIENTS);
+ client_arg data;
+ data.server = theServer;
+ data.wait = &wait;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting clients\n"));
+ if (ACE_Thread_Manager::instance ()->spawn_n
+ (MAX_TEST_CLIENTS,
+ ACE_THR_FUNC (client),
+ (void *) &data,
+ THR_BOUND | THR_DETACHED) == -1)
+ ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n%a", "spawn failed"));
+
+ // handle event
+ ACE_DEBUG((LM_DEBUG, "(%t)run event loop\n"));
+ ACE_Reactor::run_event_loop();
+
+ // wait until all server has completed
+ ACE_Thread_Manager::instance()->wait_task(&handler);
+}
+
+int
+main (int, char *[])
+{
+ ACE_START_TEST ("CLD_Connector_Test");
+
+ // initialize classix environment, such as reactor
+ ACE_CLASSIX_OS classix;
+
+ // running server and clients
+ ACE_CLASSIX_Port_Core server_port;
+ spawn(&server_port);
+
+
+ // Wait all the threads to exit.
+ ACE_Thread_Manager::instance ()->wait ();
+
+ ACE_END_TEST;
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Atomic_Op<ACE_Thread_Mutex, u_int>;
+template class ACE_Svc_Handler<ACE_CLASSIX_MCAST_DGRAM, ACE_MT_SYNCH>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Atomic_Op<ACE_Thread_Mutex, u_int>
+#pragma instantiate ACE_Svc_Handler<ACE_CLASSIX_MCAST_DGRAM, ACE_MT_SYNCH>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
+