summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h
blob: fb2bdc4627c8f63a1bb95222f7cb2a170f56c623 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/* -*- C++ -*- */
/**
 *  @file Buffering_Strategy.h
 *
 *  $Id$
 *
 *  @author Pradeep Gore <pradeep@oomworks.com>
 *
 *
 */

#ifndef TAO_Notify_BUFFERING_STRATEGY_H
#define TAO_Notify_BUFFERING_STRATEGY_H

#include /**/ "ace/pre.h"

#include "notify_serv_export.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/Null_Condition.h"
#include "ace/Message_Queue.h"

#include "orbsvcs/TimeBaseC.h"

#include "Property.h"
#include "Property_T.h"
#include "AdminProperties.h"

class TAO_Notify_Method_Request;
class TAO_Notify_QoSProperties;

typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Notify_Message_Queue;

/**
 * @class TAO_Notify_Buffering_Strategy
 *
 * @brief Base Strategy to enqueue and dequeue items from a Message Queue.
 *
 */
class TAO_Notify_Serv_Export TAO_Notify_Buffering_Strategy
{
public:
  /// Constuctor
  TAO_Notify_Buffering_Strategy (
      TAO_Notify_Message_Queue& msg_queue,
       TAO_Notify_AdminProperties_var& admin_properties,
       CORBA::Long batch_size
     );

  /// Destructor
  ~TAO_Notify_Buffering_Strategy ();

  /// Update state with the following QoS Properties:
  /// Order Policy
  /// Discard Policy
  /// MaxEventsPerConsumer
  /// TAO_Notify_Extensions::BlockingPolicy
  void update_qos_properties (const TAO_Notify_QoSProperties& qos_properties);

  /// Enqueue according the enqueing strategy.
  /// Return -1 on error else the number of items in the queue.
  int enqueue (TAO_Notify_Method_Request& method_request);

  /// Dequeue batch. This method will block for @a abstime if non-zero or else blocks till an item is available.
  /// Return -1 on error or if nothing is available, else the number of items actually dequeued (1).
  int dequeue (TAO_Notify_Method_Request* &method_request,
               const ACE_Time_Value *abstime);

  /// Shutdown
  void shutdown (void);

  /// Set the new batch size.
  void batch_size (CORBA::Long batch_size);

  /// Obtain our batch size
  CORBA::Long batch_size (void);

  /// Set the max local queue length.
  void max_local_queue_length (CORBA::Long length);

protected:
  /// Apply the Order Policy and queue. return -1 on error.
  int queue (TAO_Notify_Method_Request& method_request);

  /// Discard as per the Discard Policy.
  int discard (void);

  ///= Data Members

  /// The local Message Queue
  TAO_Notify_Message_Queue& msg_queue_;

  /// Reference to the properties per event channel.
  TAO_Notify_AdminProperties_var admin_properties_;

  /// The shared global lock used by all the queues.
  ACE_SYNCH_MUTEX& global_queue_lock_;

  /// The shared Condition for global queue not full.
  ACE_SYNCH_CONDITION& global_queue_not_full_condition_;

  /// The global queue length - queue length accross all the queues.
  CORBA::Long& global_queue_length_;

  /// The maximum events that can be queued overall.
  const TAO_Notify_Property_Long& max_global_queue_length_;

  /// The maximum queue length for the local queue.
  CORBA::Long max_local_queue_length_;

  /// Order of events in internal buffers.
  TAO_Notify_Property_Short order_policy_;

  /// Policy to discard when buffers are full.
  TAO_Notify_Property_Short discard_policy_;

  /// Flag that we should use discarding(1) or blocking (0).
  int use_discarding_;

  /// The blocking timeout will be used in place of discarding
  /// This is a TAO specific extension.
  ACE_Time_Value blocking_time_; // 0 means wait forever.

  /// Condition that the local queue is not full.
  ACE_SYNCH_CONDITION local_queue_not_full_condition_;

  /// The batch size that we want to monitor for dequeuing.
  CORBA::Long batch_size_;

  /// Condition that batch size reached.
  ACE_SYNCH_CONDITION batch_size_reached_condition_;

  /// Flag to shutdown.
  int shutdown_;
};

#if defined (__ACE_INLINE__)
#include "Buffering_Strategy.inl"
#endif /* __ACE_INLINE__ */

#include /**/ "ace/post.h"

#endif /* TAO_Notify_BUFFERING_STRATEGY_H */