summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
blob: 6372d121caa1902793581e4c368d1e8140416cff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// $Id$

#include "orbsvcs/Notify/Any/PushConsumer.h"
#include "ace/Bound_Ptr.h"
#include "tao/Stub.h" // For debug messages printing out ORBid.
#include "tao/ORB_Core.h"
#include "orbsvcs/CosEventCommC.h"
#include "orbsvcs/Notify/Event.h"
#include "orbsvcs/Notify/Properties.h"

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

TAO_Notify_PushConsumer::TAO_Notify_PushConsumer (TAO_Notify_ProxySupplier* proxy)
  :TAO_Notify_Consumer (proxy)
{
}

TAO_Notify_PushConsumer::~TAO_Notify_PushConsumer ()
{
}

void
TAO_Notify_PushConsumer::init (CosEventComm::PushConsumer_ptr push_consumer)
{
  // Initialize only once
  ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );

  // push_consumer not optional
  if (CORBA::is_nil (push_consumer))
  {
    throw CORBA::BAD_PARAM();
  }

  try
  {
    if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
      {
        this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer);

        this->publish_ =
          CosNotifyComm::NotifyPublish::_narrow (push_consumer);
      }
    else
      {
        // "Port" consumer's object reference from receiving ORB to dispatching ORB.
        CORBA::String_var temp =
          TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);

        CORBA::Object_var obj =
          TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());

        CosEventComm::PushConsumer_var new_cos_comm_pc =
          CosEventComm::PushConsumer::_unchecked_narrow(obj.in());

        this->push_consumer_ =
          CosEventComm::PushConsumer::_duplicate (new_cos_comm_pc.in());

        //
        // Note that here we do an _unchecked_narrow() in order to avoid
        // making a call on the consumer b/c the consumer may not have activated
        // its POA just yet.  That means that before we use this reference the first
        // time, we'll actually need to call _is_a() on it, i.e., the equivalent
        // of an _narrow().  At the time of this writing, the only use of
        // this->publish_ is in TAO_NS_Consumer::dispatch_updates_i (the superclass).
        // If any other use is made of this data member, then the code to validate
        // the actual type of the target object must be refactored.
        this->publish_ =
          CosNotifyComm::NotifyPublish::_unchecked_narrow (obj.in());


        //--cj verify dispatching ORB
        if (TAO_debug_level >= 10)
          {
            ORBSVCS_DEBUG ((LM_DEBUG, "(%P|%t) Any push init dispatching ORB id is %s.\n",
                        obj->_stubobj()->orb_core()->orbid()));
          }
        //--cj end
      }
  }
  catch (const CORBA::TRANSIENT& ex)
    {
      ex._tao_print_exception ("Got a TRANSIENT in NS_PushConsumer::init");
      ORBSVCS_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_PushConsumer %@\n", this));
    }
  catch (const CORBA::Exception&)
    {
      // _narrow failed which probably means the interface is CosEventComm type.
    }
}

void
TAO_Notify_PushConsumer::release (void)
{
  delete this;
  //@@ inform factory
}

void
TAO_Notify_PushConsumer::push (const CORBA::Any& payload)
{
  //--cj verify dispatching ORB
  if (TAO_debug_level >= 10) {
    ORBSVCS_DEBUG ((LM_DEBUG, "(%P|%t) Any push dispatching ORB id is %s.\n",
                this->push_consumer_->_stubobj()->orb_core()->orbid()));
  }
  //--cj end

  last_ping_ = ACE_OS::gettimeofday ();
  this->push_consumer_->push (payload);
}

void
TAO_Notify_PushConsumer::push (const CosNotification::StructuredEvent& event)
{
  CORBA::Any any;

  TAO_Notify_Event::translate (event, any);

  last_ping_ = ACE_OS::gettimeofday ();
  this->push_consumer_->push (any);
}

/// Push a batch of events to this consumer.
void
TAO_Notify_PushConsumer::push (const CosNotification::EventBatch& event)
{
  ACE_ASSERT(false);
  ACE_UNUSED_ARG (event);
  // TODO exception?
}

ACE_CString
TAO_Notify_PushConsumer::get_ior (void) const
{
  ACE_CString result;
  CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb();
  try
    {
      CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in());
      result = static_cast<const char*> (ior.in ());
    }
  catch (const CORBA::Exception&)
    {
      result.fast_clear();
    }
  return result;
}

void
TAO_Notify_PushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer)
{
  TAO_Notify_PushConsumer* tmp =
    dynamic_cast<TAO_Notify_PushConsumer*> (old_consumer);
  ACE_ASSERT(tmp != 0);
  this->init(tmp->push_consumer_.in());
  this->schedule_timer(false);
}

CORBA::Object_ptr
TAO_Notify_PushConsumer::get_consumer (void)
{
  return CosEventComm::PushConsumer::_duplicate (this->push_consumer_.in ());
}

TAO_END_VERSIONED_NAMESPACE_DECL