1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
// $Id$
#include "Notify_Buffering_Strategy.h"
#include "Notify_Command.h"
#include "orbsvcs/CosNotificationC.h"
#include "tao/debug.h"
ACE_RCSID(Notify, Notify_Buffering_Strategy, "$Id$")
TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy (TAO_Notify_Property_Long* const queue_length)
: queue_length_ (queue_length),
max_queue_length_ (0),
order_policy_ (CosNotification::AnyOrder),
discard_policy_ (CosNotification::AnyOrder)
{
}
TAO_Notify_Buffering_Strategy::~TAO_Notify_Buffering_Strategy ()
{
}
void
TAO_Notify_Buffering_Strategy::execute (ACE_Message_Queue<ACE_SYNCH>* msg_queue, TAO_Notify_Command *mb, CORBA::Environment& /*ACE_TRY_ENV*/, ACE_Time_Value *tv)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Enqueing command priority %d, queue_length = %d\n",
mb->msg_priority (), queue_length_->value ()));
int result = 0;
if (max_queue_length_ != 0 && queue_length_->value () > max_queue_length_)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "max queue length reached, discarding event\n"));
if (this->discard_policy_ == CosNotification::AnyOrder ||
this->discard_policy_ == CosNotification::FifoOrder)
{
ACE_Message_Block *first_item;
result = msg_queue->dequeue_head (first_item, tv);
}
else
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Invalid discard policy\n"));
result = -1;
}
//case (CosNotification::PriorityOrder):
//case (CosNotification::DeadlineOrder):
//case (CosNotification::LifoOrder):
if (result == -1) // we could not dequeue successfully.
return; // behave as if we discarded this event.
}
// Queue according to order policy
if (this->order_policy_ == CosNotification::AnyOrder ||
this->order_policy_ == CosNotification::FifoOrder)
{
// Insert at the end of the queue.
result = msg_queue->enqueue_tail (mb, tv);
}
else if (this->order_policy_ == CosNotification::PriorityOrder)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "enqueue in priority order\n"));
result = msg_queue->enqueue_prio (mb, tv);
}
else // CosNotification::DeadlineOrder
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Invalid discard policy\n"));
result = -1;
}
if (result == -1) // we could not enqueue successfully
{
ACE_DEBUG ((LM_DEBUG, "Panic! failed to enqueue event"));
return; // behave as if we discarded this event.
}
// increment the global count of events.
(*queue_length_)++;
}
void
TAO_Notify_Buffering_Strategy::max_queue_length (CORBA::Long max_queue_length)
{
this->max_queue_length_ = max_queue_length;
}
void
TAO_Notify_Buffering_Strategy::order_policy (CORBA::Short order_policy)
{
this->order_policy_ = order_policy;
}
void
TAO_Notify_Buffering_Strategy::discard_policy (CORBA::Short discard_policy)
{
this->discard_policy_ = discard_policy;
}
|