summaryrefslogtreecommitdiff
path: root/CIAO/ciao/FTComponentServer/StateSynchronizationAgent/StateSynchronizationAgent_i.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'CIAO/ciao/FTComponentServer/StateSynchronizationAgent/StateSynchronizationAgent_i.cpp')
-rw-r--r--CIAO/ciao/FTComponentServer/StateSynchronizationAgent/StateSynchronizationAgent_i.cpp376
1 files changed, 376 insertions, 0 deletions
diff --git a/CIAO/ciao/FTComponentServer/StateSynchronizationAgent/StateSynchronizationAgent_i.cpp b/CIAO/ciao/FTComponentServer/StateSynchronizationAgent/StateSynchronizationAgent_i.cpp
new file mode 100644
index 00000000000..ba3eaf6ca70
--- /dev/null
+++ b/CIAO/ciao/FTComponentServer/StateSynchronizationAgent/StateSynchronizationAgent_i.cpp
@@ -0,0 +1,376 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file StateSynchronizationAgent_i.cpp
+ *
+ * $Id$
+ *
+ * @author Friedhelm Wolf (fwolf@dre.vanderbilt.edu)
+ */
+//=============================================================================
+
+#include "StateSynchronizationAgent_i.h"
+#include "CorbaStateUpdate.h"
+
+#ifdef FLARE_USES_DDS
+# include "DDSStateUpdate_T.h"
+# include "StateDcps_impl.h"
+#endif
+
+StateSynchronizationAgent_i::StateSynchronizationAgent_i (
+ CORBA::ORB_ptr orb,
+ const std::string & host_id,
+ const std::string & process_id,
+ bool use_corba)
+ : orb_ (CORBA::ORB::_duplicate (orb)),
+ host_id_ (host_id),
+ process_id_ (process_id),
+#ifdef FLARE_USES_DDS
+ domain_id_ (0),
+ domain_participant_ (DDS::DomainParticipant::_nil ()),
+ publisher_ (DDS::Publisher::_nil ()),
+ subscriber_ (DDS::Subscriber::_nil ()),
+#endif /* FLARE_USES_DDS */
+ use_corba_ (use_corba)
+{
+#ifdef FLARE_USES_DDS
+ if (!use_corba_)
+ {
+ if (!this->create_participant ())
+ throw DDSFailure ("SSA could not create DDS participant\n");
+
+ if (!this->create_publisher ())
+ throw DDSFailure ("SSA could not create DDS publisher\n");
+
+ if (!this->create_subscriber ())
+ throw DDSFailure ("SSA could not create DDS subscriber\n");
+ }
+#endif /* FLARE_USES_DDS */
+}
+
+StateSynchronizationAgent_i::~StateSynchronizationAgent_i ()
+{
+#ifdef FLARE_USES_DDS
+ if (!use_corba_)
+ {
+ this->delete_subscriber ();
+ this->delete_publisher ();
+ this->delete_participant ();
+ }
+#endif /* FLARE_USES_DDS */
+}
+
+void
+StateSynchronizationAgent_i::state_changed (const char * object_id)
+{
+ ACE_DEBUG ((LM_TRACE,
+ "SSA::state_changed (%s) called.\n",
+ object_id));
+
+ // get application reference
+ ReplicatedApplication_var app;
+
+ if (application_map_.find (ACE_CString (object_id),
+ app) != 0)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "(%P|%t) SSA::state_changed () "
+ "could not find application for object id %s\n",
+ object_id));
+ return;
+ }
+
+ // get state from the application
+ CORBA::Any_var state;
+ try
+ {
+ state = app->get_state ();
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "(%P|%t) SSA::state_changed () "
+ "exception whil calling the get_state method for application %s:\n"
+ "%s",
+ object_id, ex._info ().c_str ()));
+ return;
+ }
+
+ // send state to each element in the replica_map_
+ REPLICA_OBJECT_LIST replica_group;
+ if (replica_map_.find (ACE_CString (object_id),
+ replica_group) != 0)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "(%P|%t) SSA::state_changed () "
+ "could not find replicas for the application %s\n",
+ object_id));
+ return;
+ }
+
+ ReplicatedApplication_var replica;
+ for (REPLICA_OBJECT_LIST::iterator it = replica_group.begin ();
+ it != replica_group.end ();
+ ++it)
+ {
+ try
+ {
+ // set the state on this replica
+ (*it)->set_state (state.in ());
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ ACE_DEBUG ((LM_WARNING,
+ "(%P|%t) SSA::state_changed () "
+ "exception while contacting a server replica for %s.\n",
+ object_id));
+ }
+ }
+}
+
+void
+StateSynchronizationAgent_i::update_rank_list (const RankList & rank_list)
+{
+ if (use_corba_)
+ {
+ // protect operations on the map
+ ACE_Guard <ACE_Thread_Mutex> guard (replica_map_mutex_);
+
+ // reset content of the internal map
+ replica_map_.close();
+ replica_map_.open();
+
+ ACE_DEBUG ((LM_TRACE, "SSA::update_rank_list with:\n"));
+
+ // for each replication group in the replica group list
+ for (size_t i = 0; i < rank_list.length (); ++i)
+ {
+ ACE_DEBUG ((LM_TRACE, "\toid = %s (%d entries)\n",
+ rank_list[i].object_id.in (),
+ rank_list.length ()));
+
+ // use the application id as a key for the map
+ ACE_CString oid (rank_list[i].object_id);
+
+ // create a new list for every replication group
+ REPLICA_OBJECT_LIST replica_object_list;
+
+ // for each entry of a replica group
+ for (size_t j = 0; j < rank_list[i].ior_list.length (); ++j)
+ {
+ try
+ {
+ // it is assumed that the strings identifying rank_list are
+ // stringified object references and can be resolved
+ // and used to contact the corresponding StateSynchronizationAgent
+ replica_object_list.push_back (
+ STATEFUL_OBJECT_PTR (
+ new CorbaStateUpdate (
+ ReplicatedApplication::_narrow (rank_list[i].ior_list[j]))));
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ ACE_DEBUG ((LM_WARNING,
+ "(%P|%t) SSA::"
+ "update_replica_groups could not resolve stringified "
+ "object reference %s\n",
+ rank_list[i].ior_list[j].in ()));
+ }
+ }
+
+ // add one replication group to the map
+ replica_map_.bind (oid, replica_object_list);
+ }
+ } // end if (use_corba_)
+}
+
+void
+StateSynchronizationAgent_i::register_application (const char * object_id,
+ ReplicatedApplication_ptr app)
+{
+ ACE_DEBUG ((LM_TRACE, "SSA::register_application (%s) called.\n", object_id));
+
+ ACE_CString oid (object_id);
+
+ if (application_map_.bind (oid, ReplicatedApplication::_duplicate (app)) < 0)
+ {
+ ACE_DEBUG ((LM_WARNING,
+ "(%P|%t) SSA::register_application () "
+ "could not bind application %s to the map successfully\n",
+ object_id));
+ }
+
+#ifdef FLARE_USES_DDS
+
+ // if we use DDS for communication
+ if (!use_corba_)
+ {
+ try
+ {
+ // protect operations on the map
+ ACE_Guard <ACE_Thread_Mutex> guard (replica_map_mutex_);
+
+ ACE_DEBUG ((LM_TRACE, "SSA::register_application add DDS participant"
+ " for application %s\n", object_id));
+
+ // create a new list which will have only one entry for DDS
+ REPLICA_OBJECT_LIST replica_object_list;
+
+ // register a DDS participant for this application
+ replica_object_list.push_back (
+ STATEFUL_OBJECT_PTR (
+ new DDSStateUpdate_T <CORBA::Long,
+ State,
+ StateTypeSupport,
+ StateDataWriter,
+ StateDataReader,
+ StateSeq> (
+ oid.c_str (),
+ this->get_unique_id (oid.c_str ()),
+ domain_participant_.in (),
+ publisher_.in (),
+ subscriber_.in (),
+ app)));
+
+ ACE_CString oid (object_id);
+
+ // this should work without doing a rebind, since there is only
+ // one application of the same type per process
+ replica_map_.bind (oid, replica_object_list);
+ }
+ catch (const DDSFailure & ex)
+ {
+ std::cerr << "SSA::register_application () DDS problem : "
+ << ex.description ()
+ << std::endl;
+ }
+ }
+
+#endif /* FLARE_USES_DDS */
+}
+
+#ifdef FLARE_USES_DDS
+
+bool
+StateSynchronizationAgent_i::create_participant ()
+{
+ DDS::DomainParticipantFactory_var dpf
+ = DDS::DomainParticipantFactory::get_instance ();
+
+ if (CORBA::is_nil (dpf.in ()))
+ {
+ return false;
+ }
+
+ domain_participant_ =
+ dpf->create_participant (domain_id_,
+ PARTICIPANT_QOS_DEFAULT,
+ DDS::DomainParticipantListener::_nil (),
+ DDS::ANY_STATUS);
+
+ if (CORBA::is_nil (domain_participant_.in ()))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::delete_participant ()
+{
+ DDS::DomainParticipantFactory_var dpf
+ = DDS::DomainParticipantFactory::get_instance ();
+
+ if (CORBA::is_nil (dpf.in ()))
+ {
+ return false;
+ }
+
+ DDS::ReturnCode_t status =
+ dpf->delete_participant (domain_participant_.in ());
+
+ if (status != DDS::RETCODE_OK)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::create_publisher ()
+{
+ DDS::PublisherQos pub_qos;
+ domain_participant_->get_default_publisher_qos (pub_qos);
+
+ publisher_ =
+ domain_participant_->create_publisher (pub_qos,
+ DDS::PublisherListener::_nil (),
+ DDS::ANY_STATUS);
+
+ if (CORBA::is_nil (publisher_.in ()))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::delete_publisher ()
+{
+ DDS::ReturnCode_t status =
+ domain_participant_->delete_publisher (publisher_.in ());
+
+ if (status != DDS::RETCODE_OK)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::create_subscriber ()
+{
+ subscriber_ =
+ domain_participant_->create_subscriber (SUBSCRIBER_QOS_DEFAULT,
+ DDS::SubscriberListener::_nil (),
+ DDS::ANY_STATUS);
+
+ if (CORBA::is_nil (subscriber_.in ()))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::delete_subscriber ()
+{
+ DDS::ReturnCode_t status =
+ domain_participant_->delete_subscriber (subscriber_.in ());
+
+ if (status != DDS::RETCODE_OK)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+#endif /* FLARE_USES_DDS */
+
+std::string
+StateSynchronizationAgent_i::get_unique_id (const std::string & app_name)
+{
+ std::string unique_id (app_name);
+
+ // make name unique by adding host and process id
+ unique_id += "_" + host_id_ + "_" + process_id_;
+
+ return unique_id;
+}