summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.cpp
blob: d8e9f5a655245df69aacb18a02c6037aced23b58 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// $Id$

#include "Notify_Structured_Push_Consumer.h"
#include "Notify_Test_Client.h"
#include "common.h"

#include "orbsvcs/TimeBaseC.h"

#include "tao/debug.h"

#include "ace/OS_NS_unistd.h"

Notify_Structured_Push_Consumer::Notify_Structured_Push_Consumer (
                                            const char* name,
                                            CORBA::Short policy,
                                            bool use_ordering,
                                            int expected,
                                            Notify_Test_Client& client)
 : name_ (name),
   order_policy_ (policy),
   use_ordering_ (use_ordering),
   expected_ (expected),
   count_ (0),
   first_(0),
   client_ (client)
{
  this->client_.consumer_start (this);
}


void
Notify_Structured_Push_Consumer::_connect (
                CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
                ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  CosNotifyComm::StructuredPushConsumer_var objref =
    this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
    consumer_admin->obtain_notification_push_supplier (
      CosNotifyChannelAdmin::STRUCTURED_EVENT,
      proxy_id_
      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->proxy_ =
    CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
      proxysupplier.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  if (use_ordering_)
  {
    CosNotification::QoSProperties properties (1);
    properties.length (1);
    properties[0].name = CORBA::string_dup (CosNotification::OrderPolicy);
    properties[0].value <<= this->order_policy_;

    this->proxy_->set_qos (properties);
  }

  this->proxy_->connect_structured_push_consumer (objref.in ()
                                                           ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // give ownership to POA
  this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}


void
Notify_Structured_Push_Consumer::push_structured_event (
                          const CosNotification::StructuredEvent& event
                          ACE_ENV_ARG_DECL_NOT_USED /*ACE_ENV_SINGLE_ARG_PARAMETER*/)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_DEBUG((LM_DEBUG, "-"));
  if (count_ == 0)
  {
    // Sleep long enough to force the channel to back up, otherwise
    // there will be no ordering.
    ACE_OS::sleep(2);
  }

  ++count_;

  if (this->count_ > this->expected_)
  {
    ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: too many events received.\n")));
  }
 
  if (this->count_ >= this->expected_)
  {
    this->client_.consumer_done (this);
  }

  CORBA::Long id = 0;

  ACE_ASSERT(event.header.variable_header.length() == 3);
  ACE_ASSERT(ACE_OS::strcmp(event.header.variable_header[0].name.in(), "id") == 0);
  event.header.variable_header[0].value >>= id;

  // The first event won't necessarilly be in order, because we hadn't yet forced
  // the channel to queue events.
  if (count_ > 1)
  {
    if (order_policy_ == CosNotification::PriorityOrder
      || order_policy_ == CosNotification::DeadlineOrder)
    {
      int eid = expected_ - count_ + 1;
      if (eid <= first_)
        --eid;

      if (id != eid)
        ACE_ERROR((LM_ERROR, "\nError: "));
      ACE_DEBUG((LM_DEBUG, "Expected id:%d Received id:%d\n", eid, id));
    }
    else
    {
      if (id != count_ - 1)
        ACE_ERROR((LM_ERROR, "\nError: Expected id:%d Received id:%d\n", count_ - 1, id));
    }
  }
  else
  {
    ACE_DEBUG((LM_DEBUG, "Ignoring first event. id=%d\n", id));
    first_ = id;
  }
}