summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h
blob: 92f21cb075aec1176cce8eb71b1a812ae3d51eda (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/* -*- C++ -*- */
/**
 *  @file Routing_Slip.h
 *
 *  $Id$
 *
 *  @author Dale Wilson <wilson_d@ociweb.com>
 *
 *
 */

#ifndef TAO_NOTIFY_ROUTING_SLIP_H
#define TAO_NOTIFY_ROUTING_SLIP_H
#include "ace/pre.h"

#include "notify_serv_export.h"
#include "Event.h"
#include "Delivery_Request.h"
#include "Event_Persistence_Factory.h"

#include "Persistent_File_Allocator.h"  // for Persistent_Callback

#include <ace/Vector_T.h>
#include <ace/Malloc_Base.h>  // necessary?

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

// Forward declarations of classes/pointers/collections
// referenced from this header
class TAO_Notify_EventChannelFactory;
class TAO_Notify_Method_Request;
class TAO_Notify_ProxyConsumer;
class TAO_Notify_ProxySupplier;
class TAO_Notify_Method_Request_Queueable;

namespace TAO_Notify
{

class TAO_Notify_Serv_Export Routing_Slip_Persistence_Manager;

// Forward declarations of TAO_Notify classes/pointers/collections
// referenced from this header

/// A vector of Delivery Requests.  The body of a Routing_Slip.
typedef ACE_Vector <Delivery_Request_Ptr> Delivery_Request_Vec;

/// A vector of Methods_.  Used during recovery.
typedef ACE_Vector <TAO_Notify_Method_Request_Queueable *> Delivery_Method_Vec;

class Routing_Slip;
/// A reference-counted smart pointer to a Routing_Slip
typedef ACE_Strong_Bound_Ptr<Routing_Slip, TAO_SYNCH_MUTEX> Routing_Slip_Ptr;

class Routing_Slip_Persistence_Manager;

class Routing_Slip_Queue;

/**
 * \brief Class which manages the delivery of events to destination.
 *
 * Interacts with persistent storage to provide reliable delivery.
 */
class TAO_Notify_Serv_Export Routing_Slip : public Persistent_Callback
{
  typedef ACE_Guard< TAO_SYNCH_MUTEX > Routing_Slip_Guard;
public:
  /// "Factory" method for normal use.
  static Routing_Slip_Ptr create (const TAO_Notify_Event_var& event
    ACE_ENV_ARG_DECL);

  /// "Factory" method for use during reload from persistent storage.
  static Routing_Slip_Ptr create (
    TAO_Notify_EventChannelFactory & ecf,
    Routing_Slip_Persistence_Manager * rspm);

  void set_rspm (Routing_Slip_Persistence_Manager * rspm);

  void reconnect (ACE_ENV_SINGLE_ARG_DECL);

  /// destructor (should be private but that inspires compiler wars)
  virtual ~Routing_Slip ();

  //////////////////
  // Action requests

  /// Route this event to destinations
  /// must be the Action request after
  /// the routing slip is created.
  void route (TAO_Notify_ProxyConsumer* pc, bool reliable_channel ACE_ENV_ARG_DECL);

  /// \brief Schedule delivery to a consumer via a proxy supplier
  /// \param proxy_supplier the proxy supplier that will deliver the event
  /// \param filter should consumer-based filtering be applied?
  void dispatch (TAO_Notify_ProxySupplier * proxy_supplier, bool filter ACE_ENV_ARG_DECL);


  /////////////////////////////////////////
  /// \brief Wait until the event/routing_slip has
  /// been saved at least once.
  void wait_persist ();

  /////////////////////////////////////
  // signals from the rest of the world

  /// \brief A delivery request has been satisfied.
  void delivery_request_complete (size_t request_id);

  /// \brief This Routing_Slip reached the front of the persistence queue
  void at_front_of_persist_queue ();

  /// \brief The persistent storage has completed the last request.
  virtual void persist_complete ();

  /////////////////////////////////////////////////////
  // \brief Access the event associated with this routing slip
  const TAO_Notify_Event_var & event () const;

  /// \brief Provide an identifying number for this Routing Slip
  /// to use in debug messages.
  int sequence() const;

  /// \brief Should delivery of this event be retried if it fails?
  bool should_retry () const;

private:
  ////////////////////
  // state transitions
  void enter_state_transient (Routing_Slip_Guard & guard);
  void continue_state_transient (Routing_Slip_Guard & guard);
  void enter_state_reloaded (Routing_Slip_Guard & guard);
  void enter_state_new (Routing_Slip_Guard & guard);
  void continue_state_new (Routing_Slip_Guard & guard);
  void enter_state_complete_while_new (Routing_Slip_Guard & guard);
  void enter_state_saving (Routing_Slip_Guard & guard);
  void enter_state_saved (Routing_Slip_Guard & guard);
  void enter_state_updating (Routing_Slip_Guard & guard);
  void enter_state_changed_while_saving (Routing_Slip_Guard & guard);
  void continue_state_changed_while_saving (Routing_Slip_Guard & guard);
  void enter_state_changed (Routing_Slip_Guard & guard);
  void continue_state_changed (Routing_Slip_Guard & guard);
  void enter_state_complete (Routing_Slip_Guard & guard);
  void enter_state_deleting (Routing_Slip_Guard & guard);
  void enter_state_terminal (Routing_Slip_Guard & guard);

private:
  bool create_persistence_manager();

  /// Private constructor for use by create method
  Routing_Slip(const TAO_Notify_Event_var& event);

  /// Test to see if all deliveries are complete.
  bool all_deliveries_complete () const;

  /// This routing_slip needs to be saved.
  void add_to_persist_queue(Routing_Slip_Guard & guard);

  /// Marshal into a CDR
  void marshal (TAO_OutputCDR & cdr);

  /// Marshal from CDR
  bool unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR & rscdr);

private:
  /// Protection for internal information
  TAO_SYNCH_MUTEX internals_;
  /// true when event persistence qos is guaranteed
  bool is_safe_;
  /// signalled when is_safe_ goes true
  ACE_SYNCH_CONDITION until_safe_;

  /// Smart pointer to this object
  /// Provides continuity between smart pointers and "Routing_Slip::this"
  /// Also lets the Routing_Slip manage its own minimum lifetime.
  Routing_Slip_Ptr this_ptr_;

  // The event being delivered.
  TAO_Notify_Event_var event_;

  /// A  mini-state machine to control persistence
  /// See external doc for circles and arrows.
  enum State
  {
    rssCREATING,
    rssTRANSIENT,
    rssRELOADED,
    rssNEW,
    rssCOMPLETE_WHILE_NEW,
    rssSAVING,
    rssSAVED,
    rssUPDATING,
    rssCHANGED_WHILE_SAVING,
    rssCHANGED,
    rssCOMPLETE,
    rssDELETING,
    rssTERMINAL
  } state_;

  /// A collection of delivery requests
  Delivery_Request_Vec delivery_requests_;

  /// Methods that should be restarted during event recovery
  Delivery_Method_Vec delivery_methods_;

  /// How many delivery requests are complete
  size_t complete_requests_;

  /// Pointer to a Routing_Slip_Persistence_Manager
  Routing_Slip_Persistence_Manager * rspm_;

  int sequence_;

  static TAO_SYNCH_MUTEX sequence_lock_;
  static int routing_slip_sequence_;
  static size_t count_enter_transient_;
  static size_t count_continue_transient_;
  static size_t count_enter_reloaded_;
  static size_t count_enter_new_;
  static size_t count_continue_new_;
  static size_t count_enter_complete_while_new_;
  static size_t count_enter_saving_;
  static size_t count_enter_saved_;
  static size_t count_enter_updating_;
  static size_t count_enter_changed_while_saving_;
  static size_t count_continue_changed_while_saving_;
  static size_t count_enter_changed_;
  static size_t count_continue_changed_;
  static size_t count_enter_complete_;
  static size_t count_enter_deleting_;
  static size_t count_enter_terminal_;

  static Routing_Slip_Queue persistent_queue_;
};

} // namespace

#if defined (__ACE_INLINE__)
#include "Routing_Slip.inl"
#endif /* __ACE_INLINE__ */

#include "ace/post.h"
#endif /* TAO_NOTIFY_ROUTING_SLIP_H */