summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Notify_Buffering_Strategy.cpp
blob: 739903a3b5e853fb68068124e3b7b7e0dd0c0e7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// $Id$

#include "Notify_Buffering_Strategy.h"
#include "Notify_Command.h"

#include "orbsvcs/CosNotificationC.h"

#include "tao/debug.h"

ACE_RCSID(Notify, Notify_Buffering_Strategy, "$Id$")

TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (TAO_Notify_Property_Long* const queue_length)
  : queue_length_ (queue_length),
    max_queue_length_ (0),
    order_policy_ (CosNotification::AnyOrder),
    discard_policy_ (CosNotification::AnyOrder)
{
}

TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
{
}

void
TAO_Notify_Buffering_Strategy::execute (ACE_Message_Queue<ACE_SYNCH>* msg_queue, TAO_Notify_Command *mb, CORBA::Environment& /*ACE_TRY_ENV*/, ACE_Time_Value *tv)
{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG, "Enqueing command priority %d, queue_length = %d\n",
                mb->msg_priority (), queue_length_->value ()));

  int result = 0;

  if (max_queue_length_ != 0 && queue_length_->value () > max_queue_length_)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "max queue length reached, discarding event\n"));

      if (this->discard_policy_ == CosNotification::AnyOrder ||
          this->discard_policy_ == CosNotification::FifoOrder)
        {
          ACE_Message_Block *first_item;
          result = msg_queue->dequeue_head (first_item, tv);
        }
      else
        {
          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG, "Invalid discard policy\n"));
          result = -1;
        }
      //case (CosNotification::PriorityOrder):
      //case (CosNotification::DeadlineOrder):
      //case (CosNotification::LifoOrder):

      if (result == -1) // we could not dequeue successfully.
        return; // behave as if we discarded this event.
    }

  // Queue according to order policy
  if (this->order_policy_ == CosNotification::AnyOrder ||
      this->order_policy_ == CosNotification::FifoOrder)
    {
      // Insert at the end of the queue.
      result = msg_queue->enqueue_tail (mb, tv);
    }
  else if (this->order_policy_ == CosNotification::PriorityOrder)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "enqueue in priority order\n"));
      result = msg_queue->enqueue_prio (mb, tv);
    }
  else  // CosNotification::DeadlineOrder
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG, "Invalid discard policy\n"));
      result = -1;
    }

  if (result == -1) // we could not enqueue successfully
    {
      ACE_DEBUG ((LM_DEBUG, "Panic! failed to enqueue event"));
      return; // behave as if we discarded this event.
    }

  // increment the global count of events.
  (*queue_length_)++;
}

void
TAO_Notify_Buffering_Strategy::max_queue_length (CORBA::Long max_queue_length)
{
  this->max_queue_length_ = max_queue_length;
}

void
TAO_Notify_Buffering_Strategy::order_policy (CORBA::Short order_policy)
{
  this->order_policy_ = order_policy;
}

void
TAO_Notify_Buffering_Strategy::discard_policy (CORBA::Short discard_policy)
{
  this->discard_policy_ = discard_policy;
}