summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorOssama Othman <ossama-othman@users.noreply.github.com>2001-10-30 04:06:15 +0000
committerOssama Othman <ossama-othman@users.noreply.github.com>2001-10-30 04:06:15 +0000
commit1b7580c22dbbb7ecc04aef5c251d02388439440b (patch)
treecefab9421ba69aa46855e597a092c5b65c8c2837 /tests
parent832ab264678608a7fbb7f265948217914cd59bd1 (diff)
downloadATCD-1b7580c22dbbb7ecc04aef5c251d02388439440b.tar.gz
ChangeLogTag:Mon Oct 29 20:04:40 2001 Ossama Othman <ossama@uci.edu>
Diffstat (limited to 'tests')
-rw-r--r--tests/Proactor_Test.cpp346
1 files changed, 215 insertions, 131 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index dedfdee9ee4..d0c996af2aa 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -31,13 +31,13 @@ ACE_RCSID (tests,
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
+#include "ace/Object_Manager.h"
#include "ace/Get_Opt.h"
#include "ace/streams.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Task.h"
-#include "ace/Object_Manager.h"
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
@@ -58,7 +58,7 @@ static int disable_signal (int sigmin, int sigmax);
static int proactor_type = 0;
// POSIX : > 0 max number aio operations proactor,
-static int max_aio_operations = 0;
+static size_t max_aio_operations = 0;
// both: 0 run client or server / depends on host
// != 0 run client and server
@@ -69,15 +69,15 @@ static const ACE_TCHAR *host = 0;
// number of Senders instances
static int senders = 1;
-const int MAX_SENDERS = 100;
-const int MAX_RECEIVERS = 1000;
+const int MAX_SENDERS = 100;
+const int MAX_RECEIVERS = 1000;
// duplex mode: == 0 half-duplex
// != 0 full duplex
static int duplex = 0;
// number threads in the Proactor thread pool
-static int threads = 1;
+static size_t threads = 1;
// Port that we're receiving connections on.
static u_short port = ACE_DEFAULT_SERVER_PORT;
@@ -86,6 +86,10 @@ static u_short port = ACE_DEFAULT_SERVER_PORT;
static int logflag = 0; // 0 STDERR, 1 FILE
static int loglevel = 0; // 0 full , 1 only errors
+static const size_t MIN_TIME = 1; // min 1 sec
+static const size_t MAX_TIME = 3600; // max 1 hour
+static size_t seconds = 2; // default time to run - 2 seconds
+
static char data[] =
"GET / HTTP/1.1\r\n"
"Accept: */*\r\n"
@@ -93,7 +97,7 @@ static char data[] =
"Accept-Encoding: gzip, deflate\r\n"
"User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
"Connection: Keep-Alive\r\n"
- "\r\n" ;
+ "\r\n";
// *************************************************************
// MyTask is ACE_Task resposible for :
@@ -114,102 +118,172 @@ static char data[] =
class MyTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
- MyTask (void): threads_ (0), proactor_ (0) {}
+ MyTask (void): lock_ (), sem_ (0), proactor_(0) {}
- int svc (void);
- void waitready (void) { this->event_.wait (); }
+ virtual ~MyTask() { (void) this->stop ();}
+
+ virtual int svc (void);
+
+ int start (size_t num_threads,
+ int type_proactor,
+ size_t max_op );
+ int stop (void);
-private:
- void create_proactor (void);
- void delete_proactor (void);
private:
+ int create_proactor (int type_proactor,
+ size_t max_op);
+ int delete_proactor (void);
+
ACE_SYNCH_RECURSIVE_MUTEX lock_;
- int threads_;
- ACE_Proactor *proactor_;
- ACE_Manual_Event event_;
+ ACE_Thread_Semaphore sem_;
+ ACE_Proactor * proactor_;
+
};
-void
-MyTask::create_proactor (void)
+int
+MyTask::create_proactor (int type_proactor, size_t max_op )
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX,
- monitor,
- this->lock_);
+ ACE_GUARD_RETURN ( ACE_SYNCH_RECURSIVE_MUTEX,
+ monitor,
+ this->lock_,
+ -1);
+
+ ACE_ASSERT ( this->proactor_ == 0 );
- if (this->threads_ == 0)
- {
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
- ACE_WIN32_Proactor *proactor = 0;
- ACE_NEW (proactor,
- ACE_WIN32_Proactor);
- ACE_DEBUG ((LM_DEBUG, "(%t) Create Proactor Type = WIN32\n"));
+ ACE_UNUSED_ARG ( type_proactor );
+ ACE_UNUSED_ARG ( max_op );
+
+ ACE_WIN32_Proactor *proactor = 0;
+
+ ACE_NEW_RETURN (proactor,
+ ACE_WIN32_Proactor,
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
#elif defined (ACE_HAS_AIO_CALLS)
- ACE_POSIX_Proactor *proactor = 0;
+ ACE_POSIX_Proactor * proactor = 0;
- switch (proactor_type)
- {
- case 1:
- ACE_NEW (proactor,
- ACE_POSIX_AIOCB_Proactor (max_aio_operations));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
- break;
+ switch (type_proactor)
+ {
+ case 1:
+ ACE_NEW_RETURN (proactor,
+ ACE_POSIX_AIOCB_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
+ break;
- case 2:
- ACE_NEW (proactor,
- ACE_POSIX_SIG_Proactor);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
- break;
+ case 2:
+ ACE_NEW_RETURN (proactor,
+ ACE_POSIX_SIG_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
+ break;
# if defined (sun)
- case 3:
- ACE_NEW (proactor,
- ACE_SUN_Proactor (max_aio_operations));
- ACE_DEBUG ((LM_DEBUG, "(%t) Create Proactor Type = SUN\n"));
- break;
+ case 3:
+ ACE_NEW_RETURN (proactor,
+ ACE_SUN_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
+ break;
# endif /* sun */
- default:
- ACE_NEW (proactor,
- ACE_POSIX_SIG_Proactor);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
- break;
- }
-
-#endif
+ default:
+ ACE_NEW_RETURN (proactor,
+ ACE_POSIX_SIG_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
+ break;
+ }
- ACE_NEW (this->proactor_,
- ACE_Proactor (proactor, 1));
+#endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE)
- ACE_Proactor::instance (this->proactor_);
- this->event_.signal ();
- }
+ ACE_NEW_RETURN ( this->proactor_,
+ ACE_Proactor (proactor, 1),
+ -1);
- this->threads_++;
+ ACE_Proactor::instance (this->proactor_);
+ return 0;
}
-void
+int
MyTask::delete_proactor (void)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX,
- monitor,
- this->lock_);
+ ACE_GUARD_RETURN ( ACE_SYNCH_RECURSIVE_MUTEX,
+ monitor,
+ this->lock_,
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Delete Proactor\n")));
+
+ ACE_Proactor::instance ((ACE_Proactor *) 0);
+ delete this->proactor_;
+ this->proactor_ = 0;
+
+ return 0;
+}
+
+int
+MyTask::start( size_t num_threads,
+ int type_proactor,
+ size_t max_op )
+{
+ if ( this->create_proactor (type_proactor, max_op) == -1 )
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to create proactor")),
+ -1);
+
+ if ( this->activate (THR_NEW_LWP, num_threads) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to activate thread pool")),
+ -1);
+
+ for (; num_threads > 0; num_threads-- )
+ {
+ sem_.acquire ();
+ }
+
+ return 0;
+}
+
- if (--this->threads_ == 0)
+int
+MyTask::stop ()
+{
+ if ( this->proactor_ != 0 )
{
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Delete Proactor\n")));
- ACE_Proactor::instance ((ACE_Proactor *) 0);
- delete this->proactor_;
- this->proactor_ = 0;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("End Proactor event loop\n")));
+
+ ACE_Proactor::end_event_loop ();
}
+
+ if ( this->wait () == -1 )
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to stop thread pool")));
+
+ if ( this->delete_proactor () == -1 )
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to delete proactor")));
+
+ return 0;
}
int
@@ -217,14 +291,14 @@ MyTask::svc (void)
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
- this->create_proactor ();
::disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+ // signal that we are ready
+ sem_.release (1);
+
while (ACE_Proactor::event_loop_done () == 0)
ACE_Proactor::run_event_loop ();
- this->delete_proactor ();
-
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
return 0;
}
@@ -280,12 +354,11 @@ class Acceptor : public ACE_Asynch_Acceptor<Receiver>
{
friend class Receiver;
public:
+ int get_number_sessions (void) { return this->sessions_; }
Acceptor (void);
virtual ~Acceptor (void);
- long get_number_sessions (void) { return this->sessions_; }
-
void stop (void);
// Virtual from ACE_Asynch_Acceptor
@@ -296,14 +369,14 @@ private:
void on_delete_receiver (Receiver &rcvr);
ACE_SYNCH_RECURSIVE_MUTEX lock_;
- long sessions_;
+ int sessions_;
Receiver *list_receivers_[MAX_RECEIVERS];
};
Acceptor::Acceptor (void)
: sessions_ (0)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
for (int i = 0; i < MAX_RECEIVERS; ++i)
this->list_receivers_[i] = 0;
@@ -320,7 +393,7 @@ Acceptor::stop(void)
// This method can be called only after proactor event loop is done
// in all threads.
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
for (int i = 0; i < MAX_RECEIVERS; ++i)
{
@@ -332,7 +405,7 @@ Acceptor::stop(void)
void
Acceptor::on_new_receiver (Receiver & rcvr)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->sessions_++;
this->list_receivers_[rcvr.index_] = &rcvr;
ACE_DEBUG ((LM_DEBUG,
@@ -343,7 +416,7 @@ Acceptor::on_new_receiver (Receiver & rcvr)
void
Acceptor::on_delete_receiver (Receiver & rcvr)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->sessions_--;
if (rcvr.index_ >= 0
@@ -352,14 +425,14 @@ Acceptor::on_delete_receiver (Receiver & rcvr)
this->list_receivers_[rcvr.index_] = 0;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Receiver::~DTOR sessions_=%d\n"),
+ ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
this->sessions_));
}
Receiver *
Acceptor::make_handler (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, 0);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0);
if (this->sessions_ >= MAX_RECEIVERS)
return 0;
@@ -397,7 +470,7 @@ Receiver::~Receiver (void)
ACE_OS::closesocket (this->handle_);
this->index_ = -1;
- this->handle_= ACE_INVALID_HANDLE ;
+ this->handle_= ACE_INVALID_HANDLE;
}
// return true if we alive, false we commited suicide
@@ -405,7 +478,7 @@ int
Receiver::check_destroy (void)
{
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
if (this->io_count_ > 0)
return 1;
@@ -437,7 +510,7 @@ Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
int
Receiver::initiate_read_stream (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
@@ -461,7 +534,7 @@ Receiver::initiate_read_stream (void)
int
Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
if (nbytes <= 0)
{
mb.release ();
@@ -550,7 +623,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
mb.release ();
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
}
@@ -617,7 +690,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
}
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
}
@@ -631,7 +704,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
class Sender : public ACE_Service_Handler
{
public:
- static long get_number_sessions (void) { return sessions_; }
+ static int get_number_sessions (void) { return sessions_; }
static void init (void);
static int activate (int num);
static void stop (void);
@@ -668,17 +741,17 @@ private:
static Sender *list_senders_[MAX_SENDERS];
- static long sessions_;
+ static int sessions_;
};
-long Sender::sessions_ = 0;
+int Sender::sessions_ = 0;
Sender * Sender::list_senders_[MAX_SENDERS];
void
Sender::init (void)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX,
- locker,
+ monitor,
*ACE_Static_Object_Lock::instance ());
for (int i = 0; i < MAX_SENDERS; ++i)
@@ -689,7 +762,7 @@ int
Sender::activate (int num)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
- locker,
+ monitor,
*ACE_Static_Object_Lock::instance (),
-1);
@@ -701,7 +774,7 @@ Sender::activate (int num)
int rc = 0;
- for (int i = 0; i < MAX_SENDERS && rc < num ; ++i)
+ for (int i = 0; i < MAX_SENDERS && rc < num; ++i)
{
if (list_senders_[i] != 0)
continue;
@@ -728,10 +801,10 @@ Sender::stop (void)
// in all threads
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX,
- locker,
+ monitor,
*ACE_Static_Object_Lock::instance ());
- for (int i = 0; i < MAX_SENDERS ; ++i)
+ for (int i = 0; i < MAX_SENDERS; ++i)
{
delete list_senders_[i];
list_senders_[i] = 0;
@@ -744,10 +817,11 @@ Sender::Sender (int index)
io_count_ (0)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX,
- locker,
+ monitor,
*ACE_Static_Object_Lock::instance ());
+
this->sessions_++;
- this->list_senders_[index_] = this ;
+ this->list_senders_[index_] = this;
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
@@ -759,7 +833,7 @@ Sender::Sender (int index)
Sender::~Sender (void)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX,
- locker,
+ monitor,
*ACE_Static_Object_Lock::instance ());
if (this->handle_ != ACE_INVALID_HANDLE)
@@ -771,7 +845,7 @@ Sender::~Sender (void)
this->sessions_--;
if (this->index_ >= 0)
{
- this->list_senders_[this->index_] = 0 ;
+ this->list_senders_[this->index_] = 0;
this->index_ = -1;
}
@@ -785,7 +859,7 @@ int
Sender::check_destroy (void)
{
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
if (this->io_count_ > 0)
return 1;
@@ -834,7 +908,7 @@ int Sender::open_sender (const ACE_TCHAR *host, u_short port)
{
if (duplex != 0)
// Start an asynchronous read file
- this->initiate_read_stream () ;
+ this->initiate_read_stream ();
}
this->check_destroy ();
@@ -845,7 +919,7 @@ int Sender::open_sender (const ACE_TCHAR *host, u_short port)
int
Sender::initiate_write_stream (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
int nbytes = ACE_OS::strlen (send_buf_);
@@ -874,7 +948,7 @@ Sender::initiate_write_stream (void)
int
Sender::initiate_read_stream (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
@@ -958,7 +1032,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
}
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
}
@@ -1027,7 +1101,7 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
}
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, locker, this->lock_);
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
}
@@ -1060,6 +1134,7 @@ print_usage (int /* argc */, ACE_TCHAR *argv[])
ACE_TEXT ("\n-v log level")
ACE_TEXT ("\n 0 - log all messages")
ACE_TEXT ("\n 1 - log only errors and unusual cases")
+ ACE_TEXT ("\n-i time to run in seconds")
ACE_TEXT ("\n-u show this message")
ACE_TEXT ("\n"),
argv[0]
@@ -1112,18 +1187,26 @@ parse_args (int argc, ACE_TCHAR *argv[])
#endif
threads = 3; // size of Proactor thread pool
senders = 20; // number of senders
- logflag = 1 ; // log to : 0 STDERR / 1 FILE
+ logflag = 1; // log to : 0 STDERR / 1 FILE
loglevel = 0; // log level : 0 full/ 1 only errors
+ seconds = 2; // time to run in seconds
return 0;
}
- ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:o:n:p:d:h:s:v:ub"));
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("i:t:o:n:p:d:h:s:v:ub"));
int c;
while ((c = get_opt ()) != EOF)
{
switch (c)
{
+ case 'i': // time to run
+ seconds = ACE_OS::atoi (get_opt.optarg);
+ if ( seconds < MIN_TIME )
+ seconds = MIN_TIME;
+ if ( seconds > MAX_TIME )
+ seconds = MAX_TIME;
+ break;
case 'b': // both client and server
both = 1;
break;
@@ -1174,38 +1257,39 @@ main (int argc, ACE_TCHAR *argv[])
::disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
::disable_signal (SIGPIPE, SIGPIPE);
- MyTask task1;
+ MyTask task1;
+ Acceptor acceptor;
- if (task1.activate (THR_NEW_LWP, threads) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p.\n"),
- ACE_TEXT ("main")),
- -1);
+ Sender::init ();
- // wait for creation of Proactor
- task1.waitready ();
+ if ( task1.start ( threads,
+ proactor_type,
+ max_aio_operations) == 0 )
+ {
+ int rc = 0;
- Acceptor acceptor;
- Sender::init ();
+ if (both != 0 || host == 0) // Acceptor
+ {
+ // Simplify, initial read with zero size
+ if (acceptor.open (ACE_INET_Addr (port), 0, 1) == 0)
+ rc = 1;
+ }
- int rc = 0;
+ if (both != 0 || host != 0)
+ {
+ if (host == 0)
+ host = ACE_TEXT ("localhost");
+
+ rc += Sender::activate (senders);
+ }
- if (both != 0 || host == 0) // Acceptor
- // Simplify, initial read with zero size
- if (acceptor.open (ACE_INET_Addr (port), 0, 1) == 0)
- rc = 1;
+ if ( rc > 0 )
+ ACE_OS::sleep (seconds);
- if (both != 0 || host != 0)
- {
- if (host == 0)
- host = ACE_TEXT ("localhost");
- rc += Sender::activate (senders);
}
- ACE_Proactor::end_event_loop ();
-
- task1.wait ();
+ task1.stop ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("\nNumber of Receivers objects = %d\n")