diff options
Diffstat (limited to 'TAO/tests/Bug_2417_Regression/publisher_impl.cpp')
-rw-r--r-- | TAO/tests/Bug_2417_Regression/publisher_impl.cpp | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/TAO/tests/Bug_2417_Regression/publisher_impl.cpp b/TAO/tests/Bug_2417_Regression/publisher_impl.cpp new file mode 100644 index 00000000000..e62ad292470 --- /dev/null +++ b/TAO/tests/Bug_2417_Regression/publisher_impl.cpp @@ -0,0 +1,165 @@ +// $Id$ + +#include "publisher_impl.h" +#include "ace/OS_NS_sys_time.h" +#include "ace/Task.h" +#include "ace/Process_Mutex.h" +#include "ace/OS_NS_unistd.h" +#include <vector> + +using namespace std; + +struct _Subscriber +{ + Subscriber_var subscriber; + bool unsubscribed; + unsigned int count; +}; + +class Publisher_impl::Worker : public ACE_Task_Base +{ +public: + Worker(Publisher_impl * _owner); + ~Worker(); + void addSubscriber(Subscriber_ptr subscriber); + virtual int svc (void); + void terminate(); +private: + bool terminated; + vector<_Subscriber> subscribers; + ACE_Thread_Mutex mutex; + Publisher_impl * owner; +}; + +Publisher_impl::Worker::Worker(Publisher_impl * _owner) +: owner(_owner) +{ + terminated = false; +} + +Publisher_impl::Worker::~Worker() +{ + terminated = true; +} + +void Publisher_impl::Worker::addSubscriber(Subscriber_ptr subscriber) +{ + ACE_Guard<ACE_Thread_Mutex> guard(mutex, 1, 1); + subscribers.push_back(_Subscriber()); + _Subscriber& s = subscribers.back(); + s.unsubscribed = false; + s.subscriber = Subscriber::_duplicate(subscriber); + s.count = 0; +} + +int Publisher_impl::Worker::svc (void) +{ + double data = 0.0; + bool doShutdown = false; + unsigned long iteration = 0; + ACE_Time_Value tv; + tv.set(0.01); + while (!terminated) + { + data += 0.01; + ++iteration; + { + ACE_Guard<ACE_Thread_Mutex> guard(mutex, 1, 1); + doShutdown = subscribers.size() > 0; + for (vector<_Subscriber>::iterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) + { + if (!iter->unsubscribed) + { + doShutdown = false; + try + { + if (!CORBA::is_nil(iter->subscriber.in ())) + iter->subscriber->onData(data); + else + iter->unsubscribed = true; + ++iter->count; + } + catch (...) + { + iter->unsubscribed = true; + } + } + } + } + if (iteration % 1000 == 0) + { + ACE_Guard<ACE_Thread_Mutex> guard(mutex, 1, 1); + for (vector<_Subscriber>::iterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) + { + if (!iter->unsubscribed) + { + try + { + iter->subscriber->isAlive(); + } + catch (...) + { + iter->unsubscribed = true; + } + } + } + } + if (doShutdown) + owner->shutdown(); + else + ACE_OS::sleep(tv); + } + return 0; +} + +void Publisher_impl::Worker::terminate() +{ + terminated = true; +} + +Publisher_impl::Publisher_impl(CORBA::ORB_ptr orb) +: orb_ (CORBA::ORB::_duplicate (orb)) +{ + worker = new Worker(this); + worker->activate(); +} + +Publisher_impl::~Publisher_impl() +{ + worker->terminate(); + worker->thr_mgr()->wait(); + delete worker; +} + +void subscribe ( + ::Subscriber_ptr subscriber + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + ::CORBA::SystemException + )); + +void +Publisher_impl::subscribe( + ::Subscriber_ptr subscriber + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + ::CORBA::SystemException + )) +{ + worker->addSubscriber(subscriber); +} + +void +Publisher_impl::shutdown ( + ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + ::CORBA::SystemException + )) +{ + this->orb_->shutdown (0); + worker->terminate(); +} + |