// $Id$ // ============================================================================ // // = LIBRARY // tests // // = FILENAME // Future_Test.cpp // // = DESCRIPTION // This example tests the ACE Future and illustrates an // implementation of the Active Object pattern, which is available // at . The // Active Object itself is very simple -- it determines if numbers // are prime. // // = AUTHOR // Andres Kruse , // Douglas C. Schmidt , // and Per Andersson // // ============================================================================ #include "test_config.h" #include "ace/ACE.h" #include "ace/Task.h" #include "ace/Synch.h" #include "ace/Message_Queue.h" #include "ace/Future.h" #include "ace/Method_Request.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h" #include "ace/Atomic_Op.h" ACE_RCSID(tests, Future_Test, "$Id$") #if defined (ACE_HAS_THREADS) typedef ACE_Atomic_Op ATOMIC_INT; // A counter for the tasks.. static ATOMIC_INT task_count (0); // A counter for the futures.. static ATOMIC_INT future_count (0); // A counter for the capsules.. static ATOMIC_INT capsule_count (0); // A counter for the method requests... static ATOMIC_INT method_request_count (0); class Prime_Scheduler : public ACE_Task_Base { // = TITLE // Prime number scheduler for the Active Object. // // = DESCRIPTION // This class also plays the role of the Proxy and the Servant // in the Active Object pattern. Naturally, these roles could // be split apart from the Prime_Scheduler. friend class Method_Request_work; friend class Method_Request_name; friend class Method_Request_end; public: // = Initialization and termination methods. Prime_Scheduler (const ACE_TCHAR *, Prime_Scheduler * = 0); // Constructor. virtual int open (void *args = 0); // Initializer. virtual int close (u_long flags = 0); // Terminator. virtual ~Prime_Scheduler (void); // Destructor. // = These methods are part of the Active Object Proxy interface. ACE_Future work (u_long param, int count = 1); ACE_Future name (void); void end (void); protected: virtual int svc (void); // Runs the Prime_Scheduler's event loop, which dequeues // and dispatches them. // = These are the Servant methods that do the actual work. u_long work_i (u_long, int); const ACE_TCHAR *name_i (void); private: // = These are the implementation details. ACE_TCHAR *name_; ACE_Activation_Queue activation_queue_; Prime_Scheduler *scheduler_; }; class Method_Request_work : public ACE_Method_Request { // = TITLE // Reification of the method. public: Method_Request_work (Prime_Scheduler *, u_long, int, ACE_Future &); virtual ~Method_Request_work (void); virtual int call (void); // This is the entry point into the Active Object method. private: Prime_Scheduler *scheduler_; u_long param_; // Parameter to the method that's used to determine if a number if // prime. int count_; // Unused. ACE_Future future_result_; // Store the result of the Future. }; Method_Request_work::Method_Request_work (Prime_Scheduler *new_Prime_Scheduler, u_long new_param, int new_count, ACE_Future &new_result) : scheduler_ (new_Prime_Scheduler), param_ (new_param), count_ (new_count), future_result_ (new_result) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Method_Request_work created\n"))); } Method_Request_work::~Method_Request_work (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Method_Request_work will be deleted.\n"))); } int Method_Request_work::call (void) { // Dispatch the Servant's operation and store the result into the // Future. return this->future_result_.set (this->scheduler_->work_i (this->param_, this->count_)); } class Method_Request_name : public ACE_Method_Request { // = TITLE // Reification of the method. public: Method_Request_name (Prime_Scheduler *, ACE_Future &); virtual ~Method_Request_name (void); virtual int call (void); // This is the entry point into the Active Object method. private: Prime_Scheduler *scheduler_; ACE_Future future_result_; }; Method_Request_name::Method_Request_name (Prime_Scheduler *new_scheduler, ACE_Future &new_result) : scheduler_ (new_scheduler), future_result_ (new_result) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Method_Request_name created\n"))); } Method_Request_name::~Method_Request_name (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Method_Request_name will be deleted.\n"))); } int Method_Request_name::call (void) { // Dispatch the Servant's operation and store the result into the // Future. return future_result_.set (scheduler_->name_i ()); } class Method_Request_end : public ACE_Method_Request { // = TITLE // Reification of the method. public: Method_Request_end (Prime_Scheduler *new_Prime_Scheduler); virtual ~Method_Request_end (void); virtual int call (void); private: Prime_Scheduler *scheduler_; }; Method_Request_end::Method_Request_end (Prime_Scheduler *scheduler) : scheduler_ (scheduler) { } Method_Request_end::~Method_Request_end (void) { } int Method_Request_end::call (void) { // Shut down the scheduler. this->scheduler_->close (); return -1; } // Constructor Prime_Scheduler::Prime_Scheduler (const ACE_TCHAR *newname, Prime_Scheduler *new_scheduler) : scheduler_ (new_scheduler) { ACE_NEW (this->name_, ACE_TCHAR[ACE_OS::strlen (newname) + 1]); ACE_OS::strcpy ((ACE_TCHAR *) this->name_, newname); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Prime_Scheduler %s created\n"), this->name_)); } // Destructor Prime_Scheduler::~Prime_Scheduler (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Prime_Scheduler %s will be destroyed\n"), this->name_)); delete [] this->name_; } // open int Prime_Scheduler::open (void *) { task_count++; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Prime_Scheduler %s open\n"), this->name_)); // Become an Active Object. return this->activate (THR_BOUND | THR_DETACHED); } // close int Prime_Scheduler::close (u_long) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Prime_Scheduler %s close\n"), this->name_)); task_count--; return 0; } // Service.. int Prime_Scheduler::svc (void) { for (;;) { // Dequeue the next method request (we use an auto pointer in // case an exception is thrown in the ). auto_ptr mo (this->activation_queue_.dequeue ()); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) calling method request\n"))); // Call it. if (mo->call () == -1) break; // Destructor automatically deletes it. } /* NOTREACHED */ return 0; } void Prime_Scheduler::end (void) { this->activation_queue_.enqueue (new Method_Request_end (this)); } // Here's where the Work takes place. We compute if the parameter is // a prime number. u_long Prime_Scheduler::work_i (u_long param, int count) { ACE_UNUSED_ARG (count); return ACE::is_prime (param, 2, param / 2); } const ACE_TCHAR * Prime_Scheduler::name_i (void) { return this->name_; } ACE_Future Prime_Scheduler::name (void) { if (this->scheduler_) // Delegate to the Prime_Scheduler. return this->scheduler_->name (); else { ACE_Future new_future; // @@ What happens if new fails here? this->activation_queue_.enqueue (new Method_Request_name (this, new_future)); return new_future; } } ACE_Future Prime_Scheduler::work (u_long newparam, int newcount) { if (this->scheduler_) { return this->scheduler_->work (newparam, newcount); } else { ACE_Future new_future; this->activation_queue_.enqueue (new Method_Request_work (this, newparam, newcount, new_future)); return new_future; } } // @@ These values should be set by the command line options! // Total number of loops. static int n_loops = 100; #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Atomic_Op; template class ACE_Atomic_Op_Ex; template class ACE_Future; template class ACE_Future; template class ACE_Future; template class ACE_Future_Rep; template class ACE_Future_Rep; template class ACE_Future_Rep; template class auto_ptr; template class ACE_Auto_Basic_Ptr; template class ACE_Node *>; template class ACE_Node *>; template class ACE_Node *>; template class ACE_Unbounded_Set *>; template class ACE_Unbounded_Set *>; template class ACE_Unbounded_Set *>; template class ACE_Unbounded_Set_Iterator *>; template class ACE_Unbounded_Set_Iterator *>; template class ACE_Unbounded_Set_Iterator *>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Atomic_Op #pragma instantiate ACE_Atomic_Op_Ex #pragma instantiate ACE_Future #pragma instantiate ACE_Future #pragma instantiate ACE_Future #pragma instantiate ACE_Future_Rep #pragma instantiate ACE_Future_Rep #pragma instantiate ACE_Future_Rep #pragma instantiate auto_ptr #pragma instantiate ACE_Auto_Basic_Ptr #pragma instantiate ACE_Node *> #pragma instantiate ACE_Node *> #pragma instantiate ACE_Node *> #pragma instantiate ACE_Unbounded_Set *> #pragma instantiate ACE_Unbounded_Set *> #pragma instantiate ACE_Unbounded_Set *> #pragma instantiate ACE_Unbounded_Set_Iterator *> #pragma instantiate ACE_Unbounded_Set_Iterator *> #pragma instantiate ACE_Unbounded_Set_Iterator *> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ #endif /* ACE_HAS_THREADS */ int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_START_TEST (ACE_TEXT ("Future_Test")); #if defined (ACE_HAS_THREADS) // @@ Should make these be s... Prime_Scheduler *andres, *peter, *helmut, *matias; // Create active objects.. ACE_NEW_RETURN (andres, Prime_Scheduler (ACE_TEXT ("andres")), -1); ACE_ASSERT (andres->open () != -1); ACE_NEW_RETURN (peter, Prime_Scheduler (ACE_TEXT ("peter")), -1); ACE_ASSERT (peter->open () != -1); ACE_NEW_RETURN (helmut, Prime_Scheduler (ACE_TEXT ("helmut")), -1); ACE_ASSERT (helmut->open () != -1); // Matias passes all asynchronous method calls on to Andres... ACE_NEW_RETURN (matias, Prime_Scheduler (ACE_TEXT ("matias"), andres), -1); ACE_ASSERT (matias->open () != -1); for (int i = 0; i < n_loops; i++) { { ACE_Future fresulta; ACE_Future fresultb; ACE_Future fresultc; ACE_Future fresultd; ACE_Future fresulte; ACE_Future fname; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) going to do a non-blocking call\n"))); // Spawn off the methods, which run in a separate thread as // active object invocations. fresulta = andres->work (9013); fresultb = peter->work (9013); fresultc = helmut->work (9013); fresultd = matias->work (9013); fname = andres->name (); // See if the result is available... if (fresulta.ready ()) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) wow.. work is ready.....\n"))); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) non-blocking call done... now blocking...\n"))); // Save the result of fresulta. fresulte = fresulta; if (i % 3 == 0) { // Every 3rd time... disconnect the futures... but // "fresulte" should still contain the result... fresulta.cancel (10ul); fresultb.cancel (20ul); fresultc.cancel (30ul); fresultd.cancel (40ul); } u_long resulta = 0, resultb = 0, resultc = 0, resultd = 0, resulte = 0; fresulta.get (resulta); fresultb.get (resultb); fresultc.get (resultc); fresultd.get (resultd); fresulte.get (resulte); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) result a %u\n") ACE_TEXT ("(%t) result b %u\n") ACE_TEXT ("(%t) result c %u\n") ACE_TEXT ("(%t) result d %u\n") ACE_TEXT ("(%t) result e %u\n"), (u_int) resulta, (u_int) resultb, (u_int) resultc, (u_int) resulte, (u_int) resultd)); const ACE_TCHAR *name; fname.get (name); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) name %s\n"), name)); } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) task_count %d future_count %d ") ACE_TEXT ("capsule_count %d method_request_count %d\n"), task_count.value (), future_count.value (), capsule_count.value (), method_request_count.value ())); } // Close things down. andres->end (); peter->end (); helmut->end (); matias->end (); ACE_OS::sleep (2); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) task_count %d future_count %d ") ACE_TEXT ("capsule_count %d method_request_count %d\n"), task_count.value (), future_count.value (), capsule_count.value (), method_request_count.value ())); { // Check if set then get works, older versions of // will lock forever (or until the timer expires), will use a // small timer value to avoid blocking the process. ACE_Future f1; f1.set (100); // Note you need to use absolute time, not relative time. ACE_Time_Value timeout (ACE_OS::gettimeofday () + ACE_Time_Value (10)); int value = 0; if (f1.get (value, &timeout) == 0 && value == 100) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ace_Future::Set followed by Ace_Future::Get works.\n"))); else ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Future::Set followed by Ace_Future::Get does ") ACE_TEXT ("not work, broken Ace_Future<> implementation.\n"))); } { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Checking if Ace_Future::operator= is implemented ") ACE_TEXT ("incorrectly this might crash the program.\n"))); ACE_Future f1; { // To ensure that a rep object is created. ACE_Future f2 (f1); } // Now it is one ACE_Future referencing the rep instance ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("0.\n"))); //Check that self assignment works. f1 = f1; // Is there any repesentation left, and if so what is the ref // count older ACE_Future<> implementations have deleted the rep // instance at this moment // The stuff below might crash the process if the // implementation was bad. int value = 0; ACE_Time_Value timeout (ACE_OS::gettimeofday () + ACE_Time_Value (10)); f1.set (100); f1.get (value, &timeout); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("1.\n"))); { // Might delete the same data a couple of times. ACE_Future f2 (f1); f1.set (100); f1.get (value, &timeout); } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("2.\n"))); { ACE_Future f2 (f1); f1.set (100); f1.get (value, &timeout); } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("3.\n"))); { ACE_Future f2 (f1); f1.set (100); f1.get (value, &timeout); } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("4.\n"))); { ACE_Future f2 (f1); f1.set (100); f1.get (value, &timeout); } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("5.\n"))); { ACE_Future f2 (90); f2.get (value, &timeout); f1.get (value, &timeout); } } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("No it did not crash the program.\n"))); ACE_OS::sleep (5); delete andres; delete peter; delete helmut; delete matias; #else ACE_ERROR ((LM_INFO, ACE_TEXT ("threads not supported on this platform\n"))); #endif /* ACE_HAS_THREADS */ ACE_END_TEST; return 0; }