summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Notify/Discarding/Notify_Structured_Push_Consumer.cpp
blob: 50e50144748db4253eb291b364ed8d7a6198a4df (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// $Id$

#include "ace/OS_NS_unistd.h"
#include "Notify_Structured_Push_Consumer.h"
#include "Notify_Test_Client.h"
#include "common.h"
#include "tao/debug.h"

Notify_Structured_Push_Consumer::Notify_Structured_Push_Consumer (
  const char* name,
  CORBA::Short policy,
  CORBA::Long max_events_per_consumer,
  Notify_Test_Client& client)
  : name_ (name),
  discard_policy_ (policy),
  max_events_per_consumer_ (max_events_per_consumer),
  count_ (0),
  first_ (0),
  client_ (client),
  sent_(40)
{
  this->client_.consumer_start (this);
}


void
Notify_Structured_Push_Consumer::_connect (
  CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
  ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  CosNotifyComm::StructuredPushConsumer_var objref =
    this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
    consumer_admin->obtain_notification_push_supplier (
    CosNotifyChannelAdmin::STRUCTURED_EVENT,
    proxy_id_
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->proxy_ =
    CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
    proxysupplier.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CosNotification::QoSProperties properties (2);
  properties.length (2);
  properties[0].name = CORBA::string_dup (CosNotification::DiscardPolicy);
  properties[0].value <<= this->discard_policy_;
  properties[1].name = CORBA::string_dup (CosNotification::MaxEventsPerConsumer);
  properties[1].value <<= this->max_events_per_consumer_;

  this->proxy_->set_qos (properties);
  this->proxy_->connect_structured_push_consumer (objref.in ()
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // give ownership to POA
  this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}


void
Notify_Structured_Push_Consumer::push_structured_event (
  const CosNotification::StructuredEvent& event
  ACE_ENV_ARG_DECL_NOT_USED /*ACE_ENV_SINGLE_ARG_PARAMETER*/)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_DEBUG((LM_DEBUG, "-"));

  this->count_++;
  if (this->count_ > max_events_per_consumer_ + 1)
  {
    this->client_.consumer_done (this);
    ACE_ERROR ((LM_ERROR,
      ACE_TEXT ("Structured Consumer (%P|%t): ERROR: too ")
      ACE_TEXT ("many events received (%d).\n"), this->count_));
  }

  ACE_ASSERT(ACE_OS::strcmp(event.header.variable_header[0].name.in(), "Id") == 0);
  CORBA::Any v = event.header.variable_header[0].value;
  CORBA::Long id = 0;
  v >>= id;

  // Force the notify service to queue events
  if (this->count_ == 1)
  {
    ACE_OS::sleep(2);
    first_ = id;
  }

  // @@ The priority header isn't making it through the notify service.
  //n = event.header.variable_header[1].name;
  //if (ACE_OS::strcmp(n.in(), CosNotification::Priority) != 0)
  //{
  //  ACE_ERROR((LM_ERROR, "Error: Couldn't find Priority header in event.\n"));
  //  break;
  //}
  //v = event.header.variable_header[1].value;
  //CORBA::Long priority = 0;
  //v >>= priority;
  CORBA::Long expected = 0;
  if (discard_policy_ == CosNotification::PriorityOrder)
  {
    expected = sent_ - max_events_per_consumer_ + count_;
    if (first_ != sent_ - max_events_per_consumer_ + 1)
      --expected;
  }
  else if (discard_policy_ == CosNotification::FifoOrder)
  {
    expected = sent_ - max_events_per_consumer_ + count_;
    if (first_ != sent_ - max_events_per_consumer_ + 1)
      --expected;
  }
  else if (discard_policy_ == CosNotification::LifoOrder)
  {
    expected = count_;
  }
  else
  {
    ACE_ERROR((LM_ERROR, "Error: Unexpected discard policy.\n"));
    return;
  }

  if (id != expected && count_ != 1)
  {
    ACE_DEBUG((LM_DEBUG, "Error: Expected %d, ", expected));
    this->client_.consumer_done (this);
  }

  ACE_DEBUG((LM_DEBUG, "received %d\n", id));

  // We should receive mepc + 1, because the first event will be in-transit
  // before our sleep causes the notify to queue events.
  // However, on some platforms, we'll only receive mepc, because filtering
  // happened before the first event.
  if (this->count_ >= this->max_events_per_consumer_)
  {
    this->client_.consumer_done (this);
  }
}