summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp391
1 files changed, 391 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp
new file mode 100644
index 00000000000..76634ae1217
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/LWFT/StateSynchronizationAgent_i.cpp
@@ -0,0 +1,391 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file StateSynchronizationAgent_i.cpp
+ *
+ * $Id$
+ *
+ * @author Friedhelm Wolf (fwolf@dre.vanderbilt.edu)
+ */
+//=============================================================================
+
+#include <string>
+
+#include "StateSynchronizationAgent_i.h"
+#include "CorbaStateUpdate.h"
+
+#ifdef FLARE_USES_DDS
+# include "DDSStateUpdate_T.h"
+#endif
+
+StateSynchronizationAgent_i::StateSynchronizationAgent_i (
+ CORBA::ORB_ptr orb,
+ const std::string & host_id,
+ const std::string & process_id,
+ bool use_corba)
+ : orb_ (CORBA::ORB::_duplicate (orb)),
+ host_id_ (host_id),
+ process_id_ (process_id),
+#if defined (FLARE_USES_DDS)
+ domain_id_ (0),//"FLAREDomain"),
+ domain_participant_ (DDS::DomainParticipant::_nil ()),
+ publisher_ (DDS::Publisher::_nil ()),
+ subscriber_ (DDS::Subscriber::_nil ()),
+#endif /* FLARE_USES_DDS */
+ use_corba_ (use_corba)
+{
+#if defined (FLARE_USES_DDS)
+ if (!use_corba_)
+ {
+ if (!this->create_participant ())
+ {
+ throw DDSFailure ("SSA could not create DDS participant\n");
+ }
+
+ if (!this->create_publisher ())
+ {
+ throw DDSFailure ("SSA could not create DDS publisher\n");
+ }
+
+ if (!this->create_subscriber ())
+ {
+ throw DDSFailure ("SSA could not create DDS subscriber\n");
+ }
+ }
+#endif /* FLARE_USES_DDS */
+}
+
+StateSynchronizationAgent_i::~StateSynchronizationAgent_i (void)
+{
+#ifdef FLARE_USES_DDS
+ if (!use_corba_)
+ {
+ DDS::ReturnCode_t status =
+ domain_participant_->delete_contained_entities ();
+
+ if (status != DDS::RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "SSA - "
+ "Bad retcode from delete_contained_entities()\n"));
+ }
+
+ if (! this->delete_participant ())
+ {
+ ACE_ERROR ((LM_ERROR,
+ "SSA - "
+ "Bad retcode from delete_participant()\n"));
+ }
+ }
+#endif /* FLARE_USES_DDS */
+}
+
+void
+StateSynchronizationAgent_i::state_changed (const char * object_id)
+{
+ /*
+ ACE_DEBUG ((LM_TRACE,
+ "SSA::state_changed (%s) called.\n",
+ object_id));
+ */
+ // get application reference
+ ReplicatedApplication_var app;
+
+ if (application_map_.find (ACE_CString (object_id),
+ app) != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%P|%t) SSA::state_changed () "
+ "could not find application for object id %s\n",
+ object_id));
+ return;
+ }
+
+ // Get state from the application.
+ CORBA::Any_var state;
+
+ try
+ {
+ state = app->get_state ();
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "(%P|%t) SSA::state_changed () "
+ "exception while calling the "
+ "get_state method for application "
+ "%s:\n"
+ "%s",
+ object_id,
+ ex._info ().c_str ()));
+ return;
+ }
+
+ // Send state to each element in the replica_map_.
+ ReplicaGroup replica_group;
+
+ if (replica_map_.find (ACE_CString (object_id),
+ replica_group) != 0)
+ {
+ /*
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) SSA::state_changed () "
+ "could not find replicas for the application %s\n",
+ object_id));
+ */
+ return;
+ }
+
+ ReplicatedApplication_var replica;
+
+ for (REPLICA_OBJECT_LIST::iterator it = replica_group.replicas.begin ();
+ it != replica_group.replicas.end ();
+ ++it)
+ {
+ try
+ {
+ // Set the state on this replica.
+ (*it)->set_state (state.in ());
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ ACE_DEBUG ((LM_WARNING,
+ "(%P|%t) SSA::state_changed () "
+ "exception while contacting a "
+ "server replica for %s.\n",
+ object_id));
+ }
+ }
+}
+
+void
+StateSynchronizationAgent_i::update_rank_list (const RankList & rank_list)
+{
+ if (use_corba_)
+ {
+ // If only corba is used, we can simply reset the map.
+ replica_map_.close ();
+ replica_map_.open ();
+ }
+ else
+ {
+ // only remove entries replicated by CORBA
+ for (OBJECTID_REPLICA_MAP::iterator it = replica_map_.begin ();
+ it != replica_map_.end ();
+ ++it)
+ {
+ if (!it->item ().use_dds)
+ {
+ replica_map_.unbind (it);
+ }
+ }
+ }
+
+ ACE_DEBUG ((LM_INFO,
+ "SSA::update_rank_list with:\n"));
+
+ // for each replication group in the replica group list
+ for (size_t i = 0; i < rank_list.length (); ++i)
+ {
+ ACE_DEBUG ((LM_INFO,
+ "\toid = %s (%d entries)\n",
+ rank_list[i].object_id.in (),
+ rank_list[i].ior_list.length ()));
+
+ // Use the application id as a key for the map.
+ ACE_CString oid (rank_list[i].object_id);
+
+ // If there is already an entry for this object_id it means that
+ // we have a registered DDS object and should not override it.
+ if (replica_map_.find (oid) != 0)
+ {
+ // Create a new list for every replication group.
+ ReplicaGroup replica_group;
+ replica_group.use_dds = false;
+
+ // For each entry of a replica group...
+ for (size_t j = 0;
+ j < rank_list[i].ior_list.length ();
+ ++j)
+ {
+ try
+ {
+ // it is assumed that the strings identifying
+ // rank_list are stringified object references and
+ // can be resolved and used to contact the
+ // corresponding StateSynchronizationAgent.
+ replica_group.replicas.push_back (
+ STATEFUL_OBJECT_PTR (
+ new CorbaStateUpdate (
+ rank_list[i].ior_list[j])));
+ }
+ catch (const CORBA::SystemException& ex)
+ {
+ /*
+ ACE_DEBUG ((
+ LM_WARNING,
+ "(%P|%t) SSA::"
+ "update_rank_list could not resolve stringified "
+ "object reference for %s : %s\n",
+ oid.c_str (),
+ ex._info ().c_str ()));
+ */
+ }
+ }
+
+ // Add one replication group to the map.
+ replica_map_.bind (oid, replica_group);
+ }
+ }
+}
+
+void
+StateSynchronizationAgent_i::register_application (
+ const char * object_id,
+ ReplicatedApplication_ptr app)
+{
+ //ACE_DEBUG ((LM_TRACE,
+ // "SSA::register_application (%s) called.\n", object_id));
+
+ ACE_CString oid (object_id);
+
+ if (application_map_.bind (oid, ReplicatedApplication::_duplicate (app)) < 0)
+ {
+ ACE_DEBUG ((LM_WARNING,
+ "(%P|%t) SSA::register_application () "
+ "could not bind application %s to "
+ "the map successfully\n",
+ object_id));
+ }
+}
+
+#ifdef FLARE_USES_DDS
+
+bool
+StateSynchronizationAgent_i::create_participant (void)
+{
+ DDS::DomainParticipantFactory_var dpf
+ = DDS::DomainParticipantFactory::get_instance ();
+
+ if (CORBA::is_nil (dpf.in ()))
+ {
+ return false;
+ }
+
+ domain_participant_ =
+ dpf->create_participant (
+ domain_id_,
+ PARTICIPANT_QOS_DEFAULT,
+ DDS::DomainParticipantListener::_nil (),
+ DDS::ANY_STATUS);
+
+ if (CORBA::is_nil (domain_participant_.in ()))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::delete_participant (void)
+{
+ DDS::DomainParticipantFactory_var dpf
+ = DDS::DomainParticipantFactory::get_instance ();
+
+ if (CORBA::is_nil (dpf.in ()))
+ {
+ return false;
+ }
+
+ DDS::ReturnCode_t status =
+ dpf->delete_participant (domain_participant_.in ());
+
+ if (status != DDS::RETCODE_OK)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::create_publisher (void)
+{
+ DDS::PublisherQos pub_qos;
+ domain_participant_->get_default_publisher_qos (pub_qos);
+
+ publisher_ =
+ domain_participant_->create_publisher (
+ pub_qos,
+ DDS::PublisherListener::_nil (),
+ DDS::ANY_STATUS);
+
+ if (CORBA::is_nil (publisher_.in ()))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::delete_publisher (void)
+{
+ DDS::ReturnCode_t status =
+ domain_participant_->delete_publisher (publisher_.in ());
+
+ if (status != DDS::RETCODE_OK)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::create_subscriber (void)
+{
+ subscriber_ =
+ domain_participant_->create_subscriber (
+ SUBSCRIBER_QOS_DEFAULT,
+ DDS::SubscriberListener::_nil (),
+ DDS::ANY_STATUS);
+
+ if (CORBA::is_nil (subscriber_.in ()))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+StateSynchronizationAgent_i::delete_subscriber (void)
+{
+ DDS::ReturnCode_t status =
+ domain_participant_->delete_subscriber (subscriber_.in ());
+
+ if (status != DDS::RETCODE_OK)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+#endif /* FLARE_USES_DDS */
+
+std::string
+StateSynchronizationAgent_i::get_unique_id (
+ const std::string & app_name)
+{
+ std::string unique_id (app_name);
+
+ // Make name unique by adding host and process id.
+ unique_id += "_" + host_id_ + "_" + process_id_;
+
+ return unique_id;
+}
+