summaryrefslogtreecommitdiff
path: root/trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_ProxyConsumer.cpp
blob: bbadda70bea026c7aa6fd6bf33325e23bbeb2dc3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// $Id$

#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ProxyConsumer.h"
#include "orbsvcs/FtRtEvent/EventChannel/Request_Context_Repository.h"
#include "orbsvcs/FtRtEvent/EventChannel/Replication_Service.h"
#include "../Utils/activate_with_id.h"

ACE_RCSID (EventChannel,
           TAO_FTEC_ProxyPushConsumer,
           "$Id$")

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

const TAO_FTEC_ProxyPushConsumer::RollbackOperation
  TAO_FTEC_ProxyPushConsumer::rollback_obtain =
    &FtRtecEventChannelAdmin::EventChannelFacade::disconnect_push_consumer;

TAO_FTEC_ProxyPushConsumer::TAO_FTEC_ProxyPushConsumer (TAO_EC_Event_Channel_Base* event_channel)
: Inherited(event_channel)
{
}


const FtRtecEventChannelAdmin::ObjectId&
TAO_FTEC_ProxyPushConsumer::id() const
{
  return object_id_.in();
}


  /// Activate in the POA
void
TAO_FTEC_ProxyPushConsumer::activate (
     RtecEventChannelAdmin::ProxyPushConsumer_ptr &result)
{
  result = RtecEventChannelAdmin::ProxyPushConsumer::_nil();
  try{
    object_id_ = Request_Context_Repository().get_object_id();
    PortableServer::POA_var poa = _default_POA();
    activate_object_with_id(result, poa.in(), this, id());
  }
  catch (const CORBA::Exception& ex)
  {
    ex._tao_print_exception ("");
    // ignore exceptions
  }
}

    // = The RtecEventChannelAdmin::ProxyPushConsumer methods...
void TAO_FTEC_ProxyPushConsumer::connect_push_supplier (
                RtecEventComm::PushSupplier_ptr push_supplier,
                const RtecEventChannelAdmin::SupplierQOS& qos)
{
  if (Request_Context_Repository().is_executed_request())
    return;

  FtRtecEventChannelAdmin::Operation update;
  update.object_id = this->id();
  FtRtecEventChannelAdmin::Connect_push_supplier_param param;
  param.push_supplier = RtecEventComm::PushSupplier::_duplicate(push_supplier);
  param.qos = qos;
  update.param.connect_supplier_param(param);

  Inherited::connect_push_supplier(push_supplier, qos);

  try{
    FTRTEC::Replication_Service* svc = FTRTEC::Replication_Service::instance();
    ACE_Read_Guard<FTRTEC::Replication_Service> locker(*svc);

    svc->replicate_request(update,
                           &FtRtecEventChannelAdmin::EventChannelFacade::disconnect_push_consumer);
  }
  catch (...){
    this->disconnect_push_consumer();
    throw;
  }
}

void TAO_FTEC_ProxyPushConsumer::disconnect_push_consumer (void)
{
  if (Request_Context_Repository().is_executed_request())
    return;
  FtRtecEventChannelAdmin::Operation update;
  update.object_id = id();
  update.param._d(FtRtecEventChannelAdmin::DISCONNECT_PUSH_CONSUMER);

  Inherited::disconnect_push_consumer();

  FTRTEC::Replication_Service* svc = FTRTEC::Replication_Service::instance();
  ACE_Read_Guard<FTRTEC::Replication_Service> locker(*svc);

  svc->replicate_request(update, 0);
}

void TAO_FTEC_ProxyPushConsumer::get_state(FtRtecEventChannelAdmin::ProxyPushConsumerStat& state)
{
  state.object_id = this->object_id_.in ();
  if (this->is_connected()) {
    FtRtecEventChannelAdmin::ProxyPushConsumerConnectionInfo info;
    info.push_supplier = this->supplier();
    info.qos = this->publications();
    state.parameter.info(info);
  }
}

void TAO_FTEC_ProxyPushConsumer::set_state(const FtRtecEventChannelAdmin::ProxyPushConsumerStat& state)
{
  if (!CORBA::is_nil(state.parameter.info().push_supplier.in()) )
  {
    Inherited::connect_push_supplier(state.parameter.info().push_supplier.in(),
                     state.parameter.info().qos);
  }
}

TAO_END_VERSIONED_NAMESPACE_DECL