summaryrefslogtreecommitdiff
path: root/modules/CIAO/connectors/dds4ccm/examples/Quoter/Distributor/Distributor_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/examples/Quoter/Distributor/Distributor_exec.cpp')
-rw-r--r--modules/CIAO/connectors/dds4ccm/examples/Quoter/Distributor/Distributor_exec.cpp386
1 files changed, 386 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/examples/Quoter/Distributor/Distributor_exec.cpp b/modules/CIAO/connectors/dds4ccm/examples/Quoter/Distributor/Distributor_exec.cpp
new file mode 100644
index 00000000000..1b41c4c0193
--- /dev/null
+++ b/modules/CIAO/connectors/dds4ccm/examples/Quoter/Distributor/Distributor_exec.cpp
@@ -0,0 +1,386 @@
+// -*- C++ -*-
+//
+// $Id$
+
+/**
+ * Code generated by the The ACE ORB (TAO) IDL Compiler v1.7.2
+ * TAO and the TAO IDL Compiler have been developed by:
+ * Center for Distributed Object Computing
+ * Washington University
+ * St. Louis, MO
+ * USA
+ * http://www.cs.wustl.edu/~schmidt/doc-center.html
+ * and
+ * Distributed Object Computing Laboratory
+ * University of California at Irvine
+ * Irvine, CA
+ * USA
+ * http://doc.ece.uci.edu/
+ * and
+ * Institute for Software Integrated Systems
+ * Vanderbilt University
+ * Nashville, TN
+ * USA
+ * http://www.isis.vanderbilt.edu/
+ *
+ * Information about TAO is available at:
+ * http://www.cs.wustl.edu/~schmidt/TAO.html
+ **/
+
+// TAO_IDL - Generated from
+// be/be_codegen.cpp:1278
+
+#include "Distributor_exec.h"
+#include "ace/Reactor.h"
+#include "ace/Guard_T.h"
+#include "ace/CORBA_macros.h"
+#include "ace/OS_NS_time.h"
+#include <iostream>
+
+namespace CIAO_Quoter_Distributor_Impl
+{
+ //============================================================
+ // Component Executor Implementation Class: Distributor_exec_i
+ //============================================================
+
+ pulse_Generator::pulse_Generator (Distributor_exec_i &callback)
+ : active_ (0),
+ pulse_callback_ (callback)
+ {
+ // initialize the reactor
+ this->reactor (ACE_Reactor::instance ());
+ }
+
+ pulse_Generator::~pulse_Generator ()
+ {
+ }
+
+ int
+ pulse_Generator::open_h ()
+ {
+ // convert the task into a active object that runs in separate thread
+ return this->activate ();
+ }
+
+ int
+ pulse_Generator::close_h ()
+ {
+ this->reactor ()->end_reactor_event_loop ();
+
+ // wait for all threads in the task to exit before it returns
+ return this->wait ();
+ }
+
+ int
+ pulse_Generator::start (CORBA::ULong hertz)
+ {
+ // return if not valid
+ if (hertz == 0 || this->active_ != 0)
+ {
+ return -1;
+ }
+
+ // calculate the interval time
+ long usec = 1000000 / hertz;
+
+ std::cerr << "Starting pulse_generator with hertz of " << hertz << ", interval of "
+ << usec << std::endl;
+
+ if (this->reactor ()->schedule_timer (this,
+ 0,
+ ACE_Time_Value (0, usec),
+ ACE_Time_Value (0, usec)) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to setup Timer\n"),
+ -1);
+ }
+
+ this->active_ = 1;
+ return 0;
+ }
+
+ int
+ pulse_Generator::stop (void)
+ {
+ // return if not valid.
+ if (this->active_ == 0)
+ {
+ return -1;
+ }
+ // cancle the timer
+ this->reactor ()->cancel_timer (this);
+ this->active_ = 0;
+ return 0;
+ }
+
+ int
+ pulse_Generator::active (void)
+ {
+ return this->active_;
+ }
+
+ int
+ pulse_Generator::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("[%x] handle = %d, close_mask = %d\n"),
+ this,
+ handle,
+ close_mask));
+ return 0;
+ }
+
+ int
+ pulse_Generator::handle_timeout (const ACE_Time_Value &,
+ const void *)
+ {
+ // Notify the subscribers
+ this->pulse_callback_.tick ();
+ return 0;
+ }
+
+ int
+ pulse_Generator::svc (void)
+ {
+ // define the owner of the reactor thread
+ this->reactor ()->owner (ACE_OS::thr_self ());
+
+ // run event loop to wait for event, and then dispatch them to corresponding handlers
+ this->reactor ()->run_reactor_event_loop ();
+
+ return 0;
+ }
+
+ Distributor_exec_i::Distributor_exec_i (void)
+ : rate_ (1)
+ {
+ ACE_OS::srand (static_cast <u_int> (ACE_OS::time ()));
+ this->ticker_ = new pulse_Generator (*this);
+ }
+
+ Distributor_exec_i::~Distributor_exec_i (void)
+ {
+ }
+
+ // Supported operations and attributes.
+
+ void
+ Distributor_exec_i::tick (void)
+ {
+ std::cerr << "Ticking" << std::endl;
+
+ for (Stock_Table::iterator i = this->stocks_.begin ();
+ i != this->stocks_.end ();
+ ++i)
+ {
+ if (ACE_OS::rand () % 2)
+ {
+ //std::cerr << "Updating stock: " << i->first.c_str () << std::endl;
+
+ int delta = (ACE_OS::rand () % 10) - 2;
+
+ i->second->current += delta;
+
+ if (i->second->current > i->second->high)
+ i->second->high = i->second->current;
+
+ if (i->second->current < i->second->low)
+ i->second->low = i->second->current;
+
+ if (!CORBA::is_nil (this->writer_)) {
+ printf ("WRITE AND CREATE stock_info for <%s> %u:%u:%u\n",
+ i->first.c_str(),
+ i->second->low,
+ i->second->current,
+ i->second->high);
+ this->writer_->write (i->second);
+ try
+ {
+ this->updater_->create (i->second);
+ }
+ catch (CCM_DDS::AlreadyCreated& )
+ {
+ printf ("Stock_info for <%s> already created.\n",
+ i->first.c_str ());
+ }
+ catch (CCM_DDS::InternalError& )
+ {
+ printf ("Internal Error while creating Stock_info for <%s>.\n",
+ i->first.c_str ());
+ }
+ }
+ else
+ std::cerr << "Writer reference is nil!" << std::endl;
+ }
+ else
+ {
+ if (!CORBA::is_nil (this->updater_))
+ {
+ i->second->current = ACE_OS::rand () % 50;
+ i->second->high = i->second->current + ACE_OS::rand () % 50;
+ i->second->low = ACE_OS::rand () % 50;
+ try
+ {
+ this->updater_->update (i->second);
+ printf ("Updated stock_info for <%s> %u:%u:%u\n",
+ i->first.c_str(),
+ i->second->low,
+ i->second->current,
+ i->second->high);
+ }
+ catch (CCM_DDS::NonExistent& )
+ {
+ printf ("Stock_info for <%s> not updated: <%s> didn't exist.\n",
+ i->first.c_str (), i->first.c_str ());
+ }
+ catch (CCM_DDS::InternalError& )
+ {
+ printf ("Internal Error while updating Stock_info for <%s>.\n",
+ i->first.c_str ());
+ }
+ }
+ else
+ std::cerr << "Updater reference is nil!" << std::endl;
+ }
+ }
+ }
+
+ void
+ Distributor_exec_i::add_stock (
+ const char * stock)
+ {
+ std::cerr << "Distributor_exec_i::add_stock - Adding stock: " << stock << std::endl;
+
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, _guard,
+ this->mutex_, CORBA::INTERNAL ());
+
+ Quoter::Stock_Info *new_stock = new Quoter::Stock_Info;
+
+ new_stock->low = 50;
+ new_stock->high = 50;
+ new_stock->current = 50;
+ new_stock->symbol = stock;
+
+ this->stocks_[stock] = new_stock;
+ }
+
+ void
+ Distributor_exec_i::del_stock (const char * stock)
+ {
+ std::cerr << "Distributor_exec_i::del_stock - Removing stock: " << stock << std::endl;
+
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, _guard,
+ this->mutex_, CORBA::INTERNAL ());
+
+ Stock_Table::iterator pos = this->stocks_.find (stock);
+
+ if (pos != this->stocks_.end ())
+ {
+ this->stocks_.erase (pos);
+ }
+
+ std::cerr << "Distributor_exec_i::del_stock - Stock no present: " << stock;
+ }
+
+ void
+ Distributor_exec_i::start (void)
+ {
+ this->ticker_->start (this->rate_);
+ }
+
+ void
+ Distributor_exec_i::stop (void)
+ {
+ for (Stock_Table::iterator i = this->stocks_.begin ();
+ i != this->stocks_.end ();
+ ++i)
+ {
+ printf ("Unregister <%s>\n", i->first.c_str ());
+ this->updater_->_cxx_delete (i->second);
+ }
+ this->ticker_->stop ();
+ }
+
+ // Component attributes.
+
+ ::CORBA::ULong
+ Distributor_exec_i::rate (void)
+ {
+ return this->rate_;
+ }
+
+ void
+ Distributor_exec_i::rate (
+ ::CORBA::ULong rate )
+ {
+ this->rate_ = rate;
+ }
+
+ // Port operations.
+
+ // Operations from Components::SessionComponent.
+
+ void
+ Distributor_exec_i::set_session_context (
+ ::Components::SessionContext_ptr ctx)
+ {
+ ::Quoter::CCM_Distributor_Context_var lctx =
+ ::Quoter::CCM_Distributor_Context::_narrow (ctx);
+
+ if ( ::CORBA::is_nil (lctx.in ()))
+ {
+ throw ::CORBA::INTERNAL ();
+ }
+
+ this->context_ = lctx;
+ }
+
+ void
+ Distributor_exec_i::configuration_complete (void)
+ {
+ this->writer_ = this->context_->get_connection_info_in_data ();
+ this->updater_ = this->context_->get_connection_info_update_data ();
+ this->ticker_->activate ();
+ }
+
+ void
+ Distributor_exec_i::ccm_activate (void)
+ {
+ this->start ();
+ this->add_stock ("MSFT");
+ this->add_stock ("IBM");
+ this->add_stock ("HP");
+ this->add_stock ("DELL");
+ this->add_stock ("ACER");
+ this->add_stock ("ASUS");
+ }
+
+ void
+ Distributor_exec_i::ccm_passivate (void)
+ {
+ /* Your code here. */
+ }
+
+ void
+ Distributor_exec_i::ccm_remove (void)
+ {
+ /* Your code here. */
+ //this->stop ();
+ }
+
+ extern "C" DISTRIBUTOR_EXEC_Export ::Components::EnterpriseComponent_ptr
+ create_Quoter_Distributor_Impl (void)
+ {
+ ::Components::EnterpriseComponent_ptr retval =
+ ::Components::EnterpriseComponent::_nil ();
+
+ ACE_NEW_NORETURN (
+ retval,
+ Distributor_exec_i);
+
+ return retval;
+ }
+}
+