summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-10-29 16:10:47 +0000
committerwilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-10-29 16:10:47 +0000
commit50e18994616f637b3d5444594da771f2318cc0d1 (patch)
tree19f0ddc17f1db668d1c60dd7b7fade30a7b53e7c
parent6b91b5bcda830a14261024b894a483d28b290328 (diff)
downloadATCD-pnotify_branch.tar.gz
ChangeLogTag: Fri Oct 29 10:53:56 2004 Dale Wilson <wilson_d@ociweb.com>pnotify_branch
-rw-r--r--TAO/ChangeLog_pnotify39
-rw-r--r--TAO/orbsvcs/Notify_Service/Notify_Service.cpp147
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Admin.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Builder.cpp7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl9
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp47
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp12
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h1
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp22
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp34
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp6
-rw-r--r--TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp188
-rw-r--r--TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp67
24 files changed, 400 insertions, 231 deletions
diff --git a/TAO/ChangeLog_pnotify b/TAO/ChangeLog_pnotify
index 4009c8f5089..44b45852664 100644
--- a/TAO/ChangeLog_pnotify
+++ b/TAO/ChangeLog_pnotify
@@ -1,3 +1,42 @@
+Fri Oct 29 10:53:56 2004 Dale Wilson <wilson_d@ociweb.com>
+
+ * orbsvcs/Notify_Service/Notify_Service.cpp:
+ Don't write IOR file until Notification Service
+ is *really* ready to run.
+
+ * orbsvcs/orbsvcs/Notify/Admin.cpp:
+ * orbsvcs/orbsvcs/Notify/Builder.cpp:
+ * orbsvcs/orbsvcs/Notify/Consumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Delivery_Request.h:
+ * orbsvcs/orbsvcs/Notify/Delivery_Request.inl:
+ * orbsvcs/orbsvcs/Notify/EventChannelFactory.h:
+ * orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp:
+ * orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp:
+ * orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h:
+ * orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp:
+ * orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Random_File.cpp:
+ * orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp:
+ * orbsvcs/orbsvcs/Notify/Routing_Slip.h:
+ * orbsvcs/orbsvcs/Notify/Routing_Slip.cpp:
+ * orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp:
+
+ * orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h:
+ * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp:
+ * orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp:
+
+ Add Event reloading and restarting. Passes Reconnection test.
+ There is a lot of debug output still turned on so some housecleaning
+ is definately in order.
+
+
+ * orbsvcs/tests/Notify/Reconnecting/Consumer.cpp:
+ * orbsvcs/tests/Notify/Reconnecting/Supplier.cpp:
+ Add additional diagnostic information to track down problems
+ revealed by test.
+
+
Wed Oct 27 11:59:01 2004 Dale Wilson <wilson_d@ociweb.com>
* orbsvcs/orbsvcs/CosNotification.mpc:
diff --git a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp
index 0e1eaf79ed2..12f7fdd7ea7 100644
--- a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp
+++ b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp
@@ -10,7 +10,7 @@
#include "ace/Argv_Type_Converter.h"
#include "tao/ORB_Core.h"
#include "ace/Dynamic_Service.h"
-#include "../orbsvcs/Notify/Service.h"
+#include "orbsvcs/orbsvcs/Notify/Service.h"
TAO_Notify_Service_Driver::TAO_Notify_Service_Driver (void)
: notify_service_ (0),
@@ -90,7 +90,7 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[]
if (this->notify_service_ == 0)
{
- ACE_DEBUG ((LM_DEBUG, "Service not found! check conf. file\n"));
+ ACE_DEBUG ((LM_DEBUG, "Service not found! check conf.file\n"));
return -1;
}
@@ -139,24 +139,6 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[]
ACE_ASSERT (!CORBA::is_nil (this->notify_factory_.in ()));
- // Write IOR to a file, if asked.
- CORBA::String_var str =
- this->orb_->object_to_string (this->notify_factory_.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- if (this->ior_output_file_)
- {
- ACE_OS::fprintf (this->ior_output_file_,
- "%s",
- str.in ());
- ACE_OS::fclose (this->ior_output_file_);
- this->ior_output_file_ = 0;
- }
- else if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "The Notification Event Channel Factory IOR is <%s>\n",
- str.in ()));
-
// Make it bootstrappable, if asked.
if (this->bootstrap_)
{
@@ -184,84 +166,103 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[]
}
}
+
// Register with the Name service, if asked
if (this->use_name_svc_)
- {
- // Register the Factory
- ACE_ASSERT (!CORBA::is_nil (this->naming_.in ()));
+ {
+ // Register the Factory
+ ACE_ASSERT (!CORBA::is_nil (this->naming_.in ()));
#if defined (TAO_NOTIFY_USE_NAMING_CONTEXT)
- CosNaming::Name name (1);
- name.length (1);
- name[0].id =
- CORBA::string_dup (this->notify_factory_name_.c_str ());
+ CosNaming::Name name (1);
+ name.length (1);
+ name[0].id =
+ CORBA::string_dup (this->notify_factory_name_.c_str ());
#else
- CosNaming::Name_var name =
- this->naming_->to_name (this->notify_factory_name_.c_str ()
- ACE_ENV_ARG_PARAMETER);
+ CosNaming::Name_var name =
+ this->naming_->to_name (this->notify_factory_name_.c_str ()
+ ACE_ENV_ARG_PARAMETER);
#endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */
- ACE_CHECK_RETURN (-1);
+ ACE_CHECK_RETURN (-1);
#if defined (TAO_NOTIFY_USE_NAMING_CONTEXT)
- this->naming_->rebind (name,
- this->notify_factory_.in ()
- ACE_ENV_ARG_PARAMETER);
+ this->naming_->rebind (name,
+ this->notify_factory_.in ()
+ ACE_ENV_ARG_PARAMETER);
#else
- this->naming_->rebind (name.in (),
- this->notify_factory_.in ()
- ACE_ENV_ARG_PARAMETER);
+ this->naming_->rebind (name.in (),
+ this->notify_factory_.in ()
+ ACE_ENV_ARG_PARAMETER);
#endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */
- ACE_CHECK_RETURN (-1);
+ ACE_CHECK_RETURN (-1);
- ACE_DEBUG ((LM_DEBUG,
- "Registered with the naming service as: %s\n",
- this->notify_factory_name_.c_str()));
-
- if (this->register_event_channel_ == 1)
- {
- // create an event channel
- CosNotifyChannelAdmin::ChannelID id;
-
- CosNotification::QoSProperties initial_qos;
- CosNotification::AdminProperties initial_admin;
-
- CosNotifyChannelAdmin::EventChannel_var ec =
- this->notify_factory_->create_channel (initial_qos,
- initial_admin,
- id
- ACE_ENV_ARG_PARAMETER);
+ ACE_DEBUG ((LM_DEBUG,
+ "Registered with the naming service as: %s\n",
+ this->notify_factory_name_.c_str()));
+
+ if (this->register_event_channel_ == 1)
+ {
+ // create an event channel
+ CosNotifyChannelAdmin::ChannelID id;
+
+ CosNotification::QoSProperties initial_qos;
+ CosNotification::AdminProperties initial_admin;
+
+ CosNotifyChannelAdmin::EventChannel_var ec =
+ this->notify_factory_->create_channel (initial_qos,
+ initial_admin,
+ id
+ ACE_ENV_ARG_PARAMETER);
#if defined (TAO_NOTIFY_USE_NAMING_CONTEXT)
- name[0].id =
- CORBA::string_dup (this->notify_channel_name_.c_str ());
+ name[0].id =
+ CORBA::string_dup (this->notify_channel_name_.c_str ());
#else
- name = this->naming_->to_name (
- this->notify_channel_name_.c_str ()
- ACE_ENV_ARG_PARAMETER);
+ name = this->naming_->to_name (
+ this->notify_channel_name_.c_str ()
+ ACE_ENV_ARG_PARAMETER);
#endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */
- ACE_CHECK_RETURN (-1);
-
+ ACE_CHECK_RETURN (-1);
#if defined (TAO_NOTIFY_USE_NAMING_CONTEXT)
- this->naming_->rebind (name,
- ec.in ()
- ACE_ENV_ARG_PARAMETER);
+ this->naming_->rebind (name,
+ ec.in ()
+ ACE_ENV_ARG_PARAMETER);
#else
- this->naming_->rebind (name.in (),
- ec.in ()
- ACE_ENV_ARG_PARAMETER);
+ this->naming_->rebind (name.in (),
+ ec.in ()
+ ACE_ENV_ARG_PARAMETER);
#endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */
- ACE_CHECK_RETURN (-1);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Registered an Event Channel with the naming service as: %s\n",
+ this->notify_channel_name_.c_str()));
+
+ }
+ }
- ACE_DEBUG ((LM_DEBUG,
- "Registered an Event Channel with the naming service as: %s\n",
- this->notify_channel_name_.c_str()));
+ // Write IOR to a file, if asked.
+ // Note: do this last to ensure that we're up and running before the file is written
+ CORBA::String_var str =
+ this->orb_->object_to_string (this->notify_factory_.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
- }
+ if (this->ior_output_file_)
+ {
+ ACE_OS::fprintf (this->ior_output_file_,
+ "%s",
+ str.in ());
+ ACE_OS::fclose (this->ior_output_file_);
+ this->ior_output_file_ = 0;
}
+ else if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "The Notification Event Channel Factory IOR is <%s>\n",
+ str.in ()));
return 0;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp
index 04d5b4151d3..c09b42e1bee 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp
@@ -159,7 +159,7 @@ TAO_Notify_Admin::load_attrs(const TAO_Notify::NVPList& attrs)
}
if (attrs.find ("default", value))
{
- this->is_default_ = ACE_OS::strcmp (value, "yes");
+ this->is_default_ = (ACE_OS::strcmp (value, "yes") == 0);
}
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
index 42d97901752..e6958fdb4aa 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
@@ -100,5 +100,9 @@ void
TAO_Notify_PushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
ACE_ENV_ARG_DECL)
{
- int todo_reconnect;
+ TAO_Notify_PushConsumer* tmp = ACE_dynamic_cast(TAO_Notify_PushConsumer*, old_consumer);
+ ACE_ASSERT(tmp != 0);
+ this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->schedule_timer(false);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp
index 1c185e7dd4a..5aa8b9c2390 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp
@@ -211,12 +211,9 @@ TAO_Notify_Builder::build_event_channel_factory (PortableServer::POA_ptr poa ACE
// PortableServer::ServantBase_var servant_var (ecf);
- ecf->TAO_Notify_EventChannelFactory::init (poa ACE_ENV_ARG_PARAMETER);
-
- CORBA::Object_var obj = ecf->activate (ecf ACE_ENV_ARG_PARAMETER);
+ ecf->init (poa ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (ecf_ret._retn ());
-
- ecf_ret = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
+ ecf_ret = ecf->activate_self (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (ecf_ret._retn ());
return (ecf_ret._retn ());
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
index f7b23e7d7b6..ff9b0a58e60 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
@@ -58,14 +58,14 @@ TAO_Notify_Consumer::proxy (void)
return this->proxy_supplier ();
}
-//@@ todo: consider buffering strategy
void
TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
{
this->max_batch_size_ = qos_properties.maximum_batch_size ();
/*
-if (this->max_batch_size_.is_valid ())
+//@@ todo: consider buffering strategy
+ if (this->max_batch_size_.is_valid ())
{// set the max batch size.
this->buffering_strategy_->batch_size (this->max_batch_size_.value ());
}
@@ -346,6 +346,7 @@ TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
ex._info ().c_str ()
));
// @@todo what to return here?
+ return DISPATCH_RETRY;
}
ACE_CATCHANY
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h
index 6bdf5c08496..235a5a68f85 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h
@@ -17,6 +17,7 @@
// Forward declarations of referenced classes
class TAO_Notify_EventChannelFactory;
+class TAO_Notify_ProxySupplier;
namespace TAO_Notify
{
@@ -98,6 +99,9 @@ public:
/// expose routing slip method
bool should_retry () const;
+ /// expose routing slip method
+ void dispatch (TAO_Notify_ProxySupplier * proxy_supplier, bool filter ACE_ENV_ARG_DECL);
+
// Meaningless, but needed by ACE_Vector on some platforms (gcc2.x LynxOS)
bool operator == (const Delivery_Request & rhs) const;
// Meaningless, but needed by ACE_Vector on some platforms
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl
index c8b279b908d..7ebe5e07392 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl
@@ -40,6 +40,15 @@ Delivery_Request::should_retry () const
return this->routing_slip_->should_retry ();
}
+void
+Delivery_Request::dispatch (
+ TAO_Notify_ProxySupplier * proxy_supplier,
+ bool filter ACE_ENV_ARG_DECL)
+{
+ this->routing_slip_->dispatch (proxy_supplier, filter ACE_ENV_ARG_PARAMETER);
+}
+
+
} // namespace TAO_Notify
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp
index ad1b3a8661f..14f84bb280e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp
@@ -288,8 +288,6 @@ TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& sav
void
TAO_Notify_EventChannelFactory::load_event_persistence (ACE_ENV_SINGLE_ARG_DECL)
{
-#define EVENT_PERISISTENCE_SUPPORT //@@todo
-#ifdef EVENT_PERISISTENCE_SUPPORT //@@todo
TAO_Notify::Event_Persistence_Strategy * strategy =
ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
if (strategy != 0)
@@ -329,9 +327,6 @@ TAO_Notify_EventChannelFactory::load_event_persistence (ACE_ENV_SINGLE_ARG_DECL)
ACE_CHECK;
}
}
-#else //EVENT_PERISISTENCE_SUPPORT //@@todo
-#pragma message ("TODO: EVENT_PERISISTENCE_SUPPORT")
-#endif //EVENT_PERISISTENCE_SUPPORT //@@todo
}
bool
@@ -411,13 +406,8 @@ TAO_Notify_EventChannelFactory::reconnect (ACE_ENV_SINGLE_ARG_DECL)
ACE_CHECK;
// Then send reconnection announcement to registered clients
- CORBA::Object_var obj = this->poa()->id_to_reference (this->id () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- CosNotifyChannelAdmin::EventChannelFactory_var this_reference = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ());
- ACE_ASSERT (CORBA::is_nil (this_reference.in ()));
- // todo: Is there an easier way?
- this->reconnect_registry_.send_reconnect (this_reference.in () ACE_ENV_ARG_PARAMETER);
+ ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
+ this->reconnect_registry_.send_reconnect (this->channel_factory_.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// reactivate events in-progress
@@ -525,6 +515,39 @@ TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path
return result;
}
+CosNotifyChannelAdmin::EventChannelFactory_ptr
+TAO_Notify_EventChannelFactory::activate_self (ACE_ENV_SINGLE_ARG_DECL)
+{
+ CORBA::Object_var obj = this->activate (this ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
+ this->channel_factory_
+ = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
+ CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_NEW_ENV
+ {
+ if (DEBUG_LEVEL > 9)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
+ }
+ this->reconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // ignore for now
+ }
+ ACE_ENDTRY;
+ return this->channel_factory_._retn();
+}
+
+
+TAO_Notify_Object::ID
+TAO_Notify_EventChannelFactory::get_id () const
+{
+ return id();
+}
+
+
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h
index c2eecb3c863..41af5f203bc 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h
@@ -100,6 +100,7 @@ public:
CORBA::Long id,
const TAO_Notify::NVPList& attrs
ACE_ENV_ARG_DECL);
+ CosNotifyChannelAdmin::EventChannelFactory_ptr activate_self (ACE_ENV_SINGLE_ARG_DECL);
virtual void reconnect (ACE_ENV_SINGLE_ARG_DECL);
/// handle change notifications
@@ -113,7 +114,7 @@ public:
TAO_Notify_ProxyConsumer * find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL);
TAO_Notify_ProxySupplier * find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL);
TAO_Notify_Object * follow_id_path (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL);
- virtual TAO_Notify_Object::ID get_id () const {return id();} // @@todo move to INL
+ virtual TAO_Notify_Object::ID get_id () const;
protected:
@@ -176,6 +177,9 @@ public:
private:
TAO_SYNCH_MUTEX topology_save_lock_;
+
+ CosNotifyChannelAdmin::EventChannelFactory_var channel_factory_;
+
/// change-in-progress detector to avoid duplicates
short topology_save_seq_;
TAO_Notify::Topology_Factory* topology_factory_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp
index 57c4528586b..14c22d871cc 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp
@@ -37,8 +37,16 @@ TAO_Notify_Method_Request_Lookup::work (
TAO_Notify_ProxySupplier* proxy_supplier
ACE_ENV_ARG_DECL)
{
- TAO_Notify_Method_Request_Dispatch_No_Copy request (*this, proxy_supplier, true);
- proxy_supplier->deliver (request ACE_ENV_ARG_PARAMETER);
+ if (delivery_request_.get () == 0)
+ {
+ TAO_Notify_Method_Request_Dispatch_No_Copy request (*this, proxy_supplier, true);
+ proxy_supplier->deliver (request ACE_ENV_ARG_PARAMETER);
+ }
+ else
+ {
+ delivery_request_->dispatch (proxy_supplier, true ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
}
TAO_Notify_Method_Request_Lookup::execute_i (ACE_ENV_SINGLE_ARG_DECL)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp
index 24f960f74f4..c8e21dac7d9 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp
@@ -6,7 +6,9 @@
#include "ace/OS_NS_string.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_Notify
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h
index 15b1e4eb713..97ea0556f36 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h
@@ -120,6 +120,7 @@ private:
* blocks, reads, and writes. This class also manages a thread that performs
* background updating of a Random_File.
* @@todo this is too much for one class to do. It should be refactored.
+ * @@todo: we shouldn't arbitrarily use a thread.
*/
class TAO_Notify_Serv_Export Persistent_File_Allocator
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp
index 77c6bad51dd..1d95752eea9 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp
@@ -21,7 +21,7 @@ ACE_RCSID(RT_Notify, TAO_Notify_ProxyConsumer, "$Id$")
#include "SupplierAdmin.h"
#include "EventChannel.h"
#include "Routing_Slip.h"
-#define DEBUG_LEVEL 10
+//#define DEBUG_LEVEL 10
#ifndef DEBUG_LEVEL
# define DEBUG_LEVEL TAO_debug_level
#endif //DEBUG_LEVEL
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp
index 3542a10690d..9fbb6e11c20 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp
@@ -4,8 +4,10 @@
#include "ace/OS.h"
#include <tao/debug.h>
-//#define DEBUG_LEVEL 9 // uncomment to force debug messages
-#define DEBUG_LEVEL TAO_debug_level // coment to force debug messages
+//#define DEBUG_LEVEL 9
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_Notify
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp b/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp
index a074eca09c8..322cc463d45 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp
@@ -17,6 +17,11 @@
#include "Properties.h"
#include "Topology_Saver.h"
#include <ace/Vector_T.h>
+//#define DEBUG_LEVEL 10
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif
+
namespace TAO_Notify
{
Reconnection_Registry::Reconnection_Registry (Topology_Parent & parent)
@@ -41,7 +46,7 @@ namespace TAO_Notify
//@@todo DO WE NEED THREAD SAFENESS?
NotifyExt::ReconnectionRegistry::ReconnectionID next_id = ++highest_id_;
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Reconnect registry: registering %d\n"),
@@ -68,7 +73,7 @@ namespace TAO_Notify
Reconnection_Registry::unregister_callback (NotifyExt::ReconnectionRegistry::ReconnectionID id
ACE_ENV_ARG_DECL)
{
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Reconnect registry: unregistering %d\n"),
@@ -109,7 +114,7 @@ namespace TAO_Notify
iter.advance ())
{
NVPList cattrs;
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Reconnect registry: saving %d\n"),
@@ -150,7 +155,7 @@ namespace TAO_Notify
{
highest_id_ = id;
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Reconnect registry: reloading %d\n"),
@@ -188,7 +193,7 @@ namespace TAO_Notify
{
ACE_TRY_NEW_ENV
{
- if (TAO_debug_level > 0)
+ if (DEBUG_LEVEL > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Reconnection Registry: Sending reconnection to client %d\n"),
@@ -251,9 +256,6 @@ template class ACE_Hash_Map_Manager_Ex<NotifyExt::ReconnectionRegistry::Reconnec
template class ACE_Hash_Map_Iterator_Base_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>;
-// vector of int's is instantiated elsewhere
-//template class ACE_Vector<NotifyExt::ReconnectionRegistry::ReconnectionID>;
-//template class ACE_Array_Base<NotifyExt::ReconnectionRegistry::ReconnectionID>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Hash_Map_Entry<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString >
@@ -261,7 +263,5 @@ template class ACE_Hash_Map_Reverse_Iterator_Ex<NotifyExt::ReconnectionRegistry:
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString ,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString ,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString ,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>
-// vector of int's is instantiated elsewhere
-//#pragma instantiate ACE_Vector<NotifyExt::ReconnectionRegistry::ReconnectionID>
-//#pragma instantiate ACE_Array_Base<NotifyExt::ReconnectionRegistry::ReconnectionID>
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp
index d64e96d8ecf..1f86e0699d9 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp
@@ -19,7 +19,9 @@
#include "Method_Request_Dispatch.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
#define QUEUE_ALLOWED 1
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h
index 9e57fde894a..92f21cb075a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h
@@ -96,13 +96,6 @@ public:
void dispatch (TAO_Notify_ProxySupplier * proxy_supplier, bool filter ACE_ENV_ARG_DECL);
-#if 0
- /// \brief Forward delivery to a consumer via a proxy supplier
- /// \param proxy_supplier the proxy supplier that will deliver the event
- /// \param filter should consumer-based filtering be applied?
- void forward (TAO_Notify_ProxySupplier* ps, bool filter);
-#endif
-
/////////////////////////////////////////
/// \brief Wait until the event/routing_slip has
/// been saved at least once.
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp
index 93cde74a63e..10f316f4e7c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp
@@ -10,7 +10,9 @@
#include "ace/Dynamic_Service.h"
//#define DEBUG_LEVEL 9
-#define DEBUG_LEVEL TAO_debug_level
+#ifndef DEBUG_LEVEL
+# define DEBUG_LEVEL TAO_debug_level
+#endif //DEBUG_LEVEL
namespace TAO_Notify
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index 99116aa75a2..e21a8dae22a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -34,21 +34,38 @@ TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
}
void
-TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL)
+TAO_Notify_SequencePushConsumer::init (
+ CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties
+#if 1
+ ACE_ENV_ARG_DECL_NOT_USED)
+#else //1
+ ACE_ENV_ARG_DECL)
+#endif
{
- this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
+ set_consumer (push_consumer);
- this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+#if 1 //// @@ TODO: use buffering strategy in TAO_Notify_Consumer???
+ ACE_UNUSED_ARG ( admin_properties);
+#else //1
-/* @@ TODO: use buffering strategy in TAO_Notify_Consumer???
ACE_NEW_THROW_EX (this->buffering_strategy_,
TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties,
this->max_batch_size_.value ()),
CORBA::NO_MEMORY ());
-*/
+#endif // 1
}
void
+TAO_Notify_SequencePushConsumer::set_consumer (
+ CosNotifyComm::SequencePushConsumer_ptr push_consumer)
+{
+ this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+
+}
+
+
+void
TAO_Notify_SequencePushConsumer::release (void)
{
delete this;
@@ -225,7 +242,10 @@ TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const
void
TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
- ACE_ENV_ARG_DECL)
+ ACE_ENV_ARG_DECL_NOT_USED)
{
- int todo_reconnect;
+ TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
+ ACE_ASSERT(tmp != 0);
+ this->set_consumer(tmp->push_consumer_.in());
+ this->schedule_timer(false);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
index 7397661c224..e2c39d71b43 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
@@ -51,6 +51,8 @@ public:
/// Init the Consumer
void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL);
+ void set_consumer (CosNotifyComm::SequencePushConsumer_ptr push_consumer);
+
/// TAO_Notify_Destroy_Callback methods.
virtual void release (void);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
index ec38cd3203d..01bb530cf57 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
@@ -65,7 +65,11 @@ void
TAO_Notify_StructuredPushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
ACE_ENV_ARG_DECL)
{
- int todo_reconnect;
+ TAO_Notify_StructuredPushConsumer* tmp = dynamic_cast<TAO_Notify_StructuredPushConsumer *> (old_consumer);
+ ACE_ASSERT(tmp != 0);
+ this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->schedule_timer(false);
}
bool
diff --git a/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp b/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp
index f03f1dfa917..3ac4643c225 100644
--- a/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp
+++ b/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp
@@ -1070,11 +1070,11 @@ Consumer_Main::load_ids()
{
int field = 0;
- char buffer[100];
+ char buffer[100] = ""; // because ACE fgets doesn't put a null if the file is empty
ACE_OS::fgets (buffer, sizeof(buffer), idf);
ACE_OS::fclose (idf);
char * pb = buffer;
- while (*pb != 0)
+ while (!ok && *pb != 0)
{
char * eb = ACE_OS::strchr (pb, ',');
char * nb = eb + 1;
@@ -1083,14 +1083,11 @@ Consumer_Main::load_ids()
eb = pb + ACE_OS::strlen (pb);
nb = eb;
}
- else
- {
- *eb = 0;
- }
+ *eb = 0;
if (pb < eb)
{
int value = ACE_OS::atoi(pb);
- switch (field)
+ switch (++field)
{
case 1:
this->mode_ = static_cast<Mode_T> (value);
@@ -1291,7 +1288,7 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
if (this->verbose_)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) supplier: Connect to Existing event channel %d\n"),
+ ACE_TEXT ("(%P|%t) Consumer: Connect to Existing event channel %d\n"),
ACE_static_cast (int, this->ec_id_)
));
}
@@ -1308,8 +1305,8 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
if (!ok)
{
- CosNotification::QoSProperties qosprops(7);
- qosprops.length(7);
+ CosNotification::QoSProperties qosprops (7);
+ qosprops.length (7);
CORBA::ULong i = 0;
#ifdef DISABLE_PROPERTIES_TODO
qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability);
@@ -1329,7 +1326,7 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
#endif
qosprops.length (i);
CosNotification::AdminProperties adminprops(4);
- adminprops.length(4);
+ adminprops.length (4);
i = 0;
#ifdef DISABLE_PROPERTIES_TODO
adminprops[i].name = CORBA::string_dup(CosNotification::MaxQueueLength);
@@ -1349,7 +1346,6 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
this->ec_id_
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
-
ok = ! CORBA::is_nil (ec_.in ());
if (ok && this->verbose_)
{
@@ -1378,35 +1374,63 @@ void
Consumer_Main::init_consumer_admin (ACE_ENV_SINGLE_ARG_DECL)
{
bool ok = false;
- if (this->reconnecting_ && this->sa_id_ != default_admin_id)
+ if (this->reconnecting_)
{
- ACE_TRY_EX(ONE)
+ if (this->sa_id_ == default_admin_id)
{
- this->sa_ = this->ec_->get_consumeradmin(
- this->sa_id_
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK_EX(ONE);
- ok = ! CORBA::is_nil (this->sa_.in ());
- if (ok && this->verbose_)
+ ACE_TRY_EX(TWO)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Consumer: Reconnect to consumer admin %d\n"),
- ACE_static_cast (int, this->sa_id_)
- ));
+ this->sa_ = this->ec_->default_consumer_admin (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK_EX(TWO);
+ ok = ! CORBA::is_nil (this->sa_.in ());
+ this->sa_id_ = default_admin_id;
+ if (ok && this->verbose_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: Using default consumer admin\n")
+ ));
+ }
+ else if (this->verbose_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: No default consumer admin\n")
+ ));
+ }
}
+ ACE_CATCHALL
+ {
+ }
+ ACE_ENDTRY;
}
- ACE_CATCHALL
+ else // not default admin
{
+ ACE_TRY_EX(ONE)
+ {
+ this->sa_ = this->ec_->get_consumeradmin(
+ this->sa_id_
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK_EX(ONE);
+ ok = ! CORBA::is_nil (this->sa_.in ());
+ if (ok && this->verbose_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: Reconnect to consumer admin %d\n"),
+ ACE_static_cast (int, this->sa_id_)
+ ));
+ }
+ }
+ ACE_CATCHALL
+ {
+ }
+ ACE_ENDTRY;
}
- ACE_ENDTRY;
}
-
- if (!ok)
+ else // !reconnecting
{
- ACE_TRY_EX(TWO)
+ ACE_TRY_EX(THREE)
{
this->sa_ = this->ec_->default_consumer_admin (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK_EX(TWO);
+ ACE_TRY_CHECK_EX(THREE);
ok = ! CORBA::is_nil (this->sa_.in ());
this->sa_id_ = default_admin_id;
if (ok && this->verbose_)
@@ -1426,45 +1450,51 @@ Consumer_Main::init_consumer_admin (ACE_ENV_SINGLE_ARG_DECL)
{
}
ACE_ENDTRY;
- }
- if (!ok)
- {
- this->sa_ = this->ec_->new_for_consumers(
- CosNotifyChannelAdmin::OR_OP,
- this->sa_id_
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- ok = ! CORBA::is_nil (this->sa_.in ());
-
-#ifdef TEST_SET_QOS
- // temporary: be sure we can set qos properties here
- if (ok)
+ if (!ok)
{
- CosNotification::QoSProperties qosprops(2);
- CORBA::ULong i = 0;
- qosprops.length(2);
-
- qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability);
- qosprops[i++].value <<= CosNotification::Persistent;
- qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability);
- qosprops[i++].value <<= CosNotification::Persistent; // Required, or we won't persist much
- qosprops.length(i);
- this->sa_->set_qos (qosprops ACE_ENV_ARG_PARAMETER);
+ this->sa_ = this->ec_->new_for_consumers(
+ CosNotifyChannelAdmin::OR_OP,
+ this->sa_id_
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- }
+ ok = ! CORBA::is_nil (this->sa_.in ());
+
+#ifdef TEST_SET_QOS
+ // temporary: be sure we can set qos properties here
+ if (ok)
+ {
+ CosNotification::QoSProperties qosprops(2);
+ CORBA::ULong i = 0;
+ qosprops.length(2);
+
+ qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability);
+ qosprops[i++].value <<= CosNotification::Persistent;
+ qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability);
+ qosprops[i++].value <<= CosNotification::Persistent; // Required, or we won't persist much
+ qosprops.length(i);
+ this->sa_->set_qos (qosprops ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
#endif
- if (ok && this->verbose_)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Consumer: Create new consumer admin %d\n"),
- ACE_static_cast (int, this->sa_id_)
- ));
+ if (ok && this->verbose_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: Create new consumer admin %d\n"),
+ ACE_static_cast (int, this->sa_id_)
+ ));
+ }
}
}
+ if (!ok)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: connect to consumer admin failed %d\n"),
+ ACE_static_cast (int, this->sa_id_)
+ ));
+ }
}
-
void
Consumer_Main::init_structured_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
{
@@ -1505,7 +1535,7 @@ Consumer_Main::init_structured_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
if (ok && this->verbose_)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Consumer: Create new proxy %d\n"),
+ ACE_TEXT ("(%P|%t) Consumer: Create new structured proxy %d\n"),
ACE_static_cast (int, this->structured_proxy_id_)
));
}
@@ -1601,7 +1631,7 @@ Consumer_Main::init_sequence_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
if (ok && this->verbose_)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Consumer: Create new proxy %d\n"),
+ ACE_TEXT ("(%P|%t) Consumer: Create new sequence proxy %d\n"),
ACE_static_cast (int, this->sequence_proxy_id_)
));
}
@@ -1692,9 +1722,28 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
ACE_static_cast (int, this->any_proxy_id_)
));
}
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: Get proxy supplier %d returned nil\n"),
+ ACE_static_cast (int, this->any_proxy_id_)
+ ));
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: Get proxy supplier %d threw exception\n"),
+ ACE_static_cast (int, this->any_proxy_id_)
+ ));
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, ACE_TEXT ("To wit:"));
}
ACE_CATCHALL
{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Consumer: Get proxy supplier %d threw exception\n"),
+ ACE_static_cast (int, this->any_proxy_id_)
+ ));
}
ACE_ENDTRY;
}
@@ -1708,18 +1757,10 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
ACE_CHECK;
ok = ! CORBA::is_nil (proxy.in ());
-#ifdef TEST_SET_QOS
- // temporary
- if (ok)
- {
- set_proxy_qos (proxy.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
-#endif // TEST_SET_QOS
if (ok && this->verbose_)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Consumer: Create new proxy\n"),
+ ACE_TEXT ("(%P|%t) Consumer: Create new Any proxy %d\n"),
ACE_static_cast (int, this->any_proxy_id_)
));
}
@@ -1732,7 +1773,7 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Consumer: Received wrong type of push supplier proxy %d\n"),
- ACE_static_cast (int, this->sequence_proxy_id_)
+ ACE_static_cast (int, this->any_proxy_id_)
));
ACE_THROW (CORBA::BAD_PARAM());
}
@@ -1764,6 +1805,7 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL)
this->any_push_consumer_ref_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
+
this->any_push_consumer_.set_connected(true);
}
diff --git a/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp b/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp
index 5c0f2757a5b..88bfd7cdb3a 100644
--- a/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp
+++ b/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp
@@ -452,13 +452,13 @@ Supplier_Main::save_ids()
int imode = ACE_static_cast (int, this->mode_);
ACE_OS::fprintf (idf,
"%d,%d,%d,%d,%d,%d,%d,\n",
- static_cast <int> (imode),
- static_cast <int> (ec_id_),
- static_cast <int> (sa_id_),
- static_cast <int> (structured_proxy_id_),
- static_cast <int> (sequence_proxy_id_),
- static_cast <int> (any_proxy_id_),
- static_cast <int> (endflag) );
+ static_cast<int> (imode),
+ static_cast<int> (ec_id_),
+ static_cast<int> (sa_id_),
+ static_cast<int> (structured_proxy_id_),
+ static_cast<int> (sequence_proxy_id_),
+ static_cast<int> (any_proxy_id_),
+ static_cast<int> (endflag) );
ACE_OS::fclose (idf);
}
}
@@ -474,14 +474,12 @@ Supplier_Main::load_ids()
{
int field = 0;
- char buffer[100] = "";
+ char buffer[100] = ""; // because ACE fgets doesn't put a null if the file is empty
ACE_OS::fgets (buffer, sizeof(buffer), idf);
ACE_OS::fclose (idf);
-
char * pb = buffer;
while (!ok && *pb != 0)
{
-fprintf (stderr, "Buffer: %s\n" , pb);
char * eb = ACE_OS::strchr (pb, ',');
char * nb = eb + 1;
if (eb == 0)
@@ -490,11 +488,9 @@ fprintf (stderr, "Buffer: %s\n" , pb);
nb = eb;
}
*eb = 0;
-fprintf (stderr, "PARSE: [%s]\n", pb);
if (pb < eb)
{
int value = ACE_OS::atoi(pb);
-fprintf (stderr, "field[%d], %d\n", field, value);
switch (++field)
{
case 1:
@@ -657,6 +653,8 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
ACE_ENDTRY;
}
+ // if we don't have a channel yet, and a channel id file was specified
+ // try to read from it
if (!ok && this->channel_file_.length () > 0)
{
FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "r");
@@ -675,12 +673,17 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK_EX (unique_label_1)
ok = ! CORBA::is_nil (this->ec_.in ());
- if (ok && this->verbose_)
+ if (ok)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) supplier: Connect to Consumer's event channel %d\n"),
- ACE_static_cast (int, this->ec_id_)
- ));
+ if (this->verbose_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Supplier: Connect to Existing event channel %d\n"),
+ ACE_static_cast (int, this->ec_id_)
+ ));
+ }
+ // kill the channel filename so we don't overwrite the file
+ this->channel_file_ = "";
}
}
ACE_CATCHALL
@@ -692,11 +695,10 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
if (!ok)
{
- CosNotification::QoSProperties qosprops(7);
- qosprops.length(7);
-
+ CosNotification::QoSProperties qosprops (7);
+ qosprops.length (7);
CORBA::ULong i = 0;
-#ifdef DISABLE_PROPERITIES_TODO
+#ifdef DISABLE_PROPERTIES_TODO
qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability);
qosprops[i++].value <<= CosNotification::Persistent;
qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability);
@@ -712,10 +714,9 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
qosprops[i].name = CORBA::string_dup(CosNotification::PacingInterval);
qosprops[i++].value <<= (TimeBase::TimeT) 50 * 10000; // 50ms
#endif
- qosprops.length(i);
-
+ qosprops.length (i);
CosNotification::AdminProperties adminprops(4);
- adminprops.length(4);
+ adminprops.length (4);
i = 0;
#ifdef DISABLE_PROPERTIES_TODO
adminprops[i].name = CORBA::string_dup(CosNotification::MaxQueueLength);
@@ -740,10 +741,21 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Supplier: Create event channel %d\n"),
- ACE_static_cast (int, this->ec_id_)
+ static_cast<int> (this->ec_id_)
));
}
}
+
+ // save channel id
+ if (ok && this->channel_file_.length() > 0)
+ {
+ FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "w");
+ if (chf != 0)
+ {
+ fprintf (chf, "%d\n", static_cast<int> (this->ec_id_));
+ fclose (chf);
+ }
+ }
}
CosNotifyChannelAdmin::AdminID default_admin_id = ACE_static_cast (CosNotifyChannelAdmin::AdminID, -1);
@@ -1033,7 +1045,7 @@ Supplier_Main::init_any_proxy_consumer (ACE_ENV_SINGLE_ARG_DECL)
if (ok && this->verbose_)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Supplier: Create new proxy\n"),
+ ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"),
ACE_static_cast (int, this->any_proxy_id_)
));
}
@@ -1083,9 +1095,6 @@ Supplier_Main::init_any_proxy_consumer (ACE_ENV_SINGLE_ARG_DECL)
int Supplier_Main::fini (ACE_ENV_SINGLE_ARG_DECL)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) supplier::fini\n")
- ));
if (this->disconnect_on_exit_)
{
this->reconnection_callback_.fini (ACE_ENV_SINGLE_ARG_PARAMETER);