summaryrefslogtreecommitdiff
path: root/ace/Stream.h
blob: b5bcbe002d9addae27079149e3e92105fdd6ed15 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// -*- C++ -*-

//==========================================================================
/**
 *  @file    Stream.h
 *
 *  $Id$
 *
 *  @author Douglas C. Schmidt <schmidt@uci.edu>
 */
//==========================================================================

#ifndef ACE_STREAM_H
#define ACE_STREAM_H

#include "ace/pre.h"

#include "ace/config-all.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/IO_Cntl_Msg.h"
#include "ace/Message_Block.h"
#include "ace/Time_Value.h"
#include "ace/Module.h"

// Forward decls.
template<ACE_SYNCH_DECL> class ACE_Stream_Iterator;

/**
 * @class ACE_Stream
 *
 * @brief This class is the primary abstraction for the ASX framework.
 * It is moduled after System V Stream.
 *
 * A Stream consists of a stack of <ACE_Modules>, each of which
 * contains two <ACE_Tasks>.  Even though the methods in this
 * class are virtual, this class isn't really intended for
 * subclassing unless you know what you are doing.  In
 * particular, the <ACE_Stream> destructor calls <close>, which
 * won't be overridden properly unless you call it in a subclass
 * destructor.
 */
template <ACE_SYNCH_DECL>
class ACE_Stream
{
public:
  friend class ACE_Stream_Iterator<ACE_SYNCH_USE>;

  enum
  {
    /// Indicates that <close> deletes the Tasks.  Don't change this
    /// value without updating the same enum in class ACE_Module...
    M_DELETE = 3
  };

  // = Initializatation and termination methods.
  /**
   * Create a Stream consisting of <head> and <tail> as the Stream
   * head and Stream tail, respectively.  If these are 0 then the
   * <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
   * <arg> is the value past in to the <open> methods of the tasks.
   */
  ACE_Stream (void *arg = 0,
              ACE_Module<ACE_SYNCH_USE> *head = 0,
              ACE_Module<ACE_SYNCH_USE> *tail = 0);

  /**
   * Create a Stream consisting of <head> and <tail> as the Stream
   * head and Stream tail, respectively.  If these are 0 then the
   * <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
   * <arg> is the value past in to the <open> methods of the tasks.
   */
  virtual int open (void *arg,
                    ACE_Module<ACE_SYNCH_USE> *head = 0,
                    ACE_Module<ACE_SYNCH_USE> *tail = 0);

  /// Close down the stream and release all the resources.
  virtual int close (int flags = M_DELETE);

  /// Close down the stream and release all the resources.
  virtual ~ACE_Stream (void);

  // = ACE_Stream plumbing operations

  /// Add a new module <mod> right below the Stream head.
  virtual int push (ACE_Module<ACE_SYNCH_USE> *mod);

  /// Remove the <mod> right below the Stream head and close it down.
  virtual int pop (int flags = M_DELETE);

  /// Return the top module on the stream (right below the stream
  /// head).
  virtual int top (ACE_Module<ACE_SYNCH_USE> *&mod);

  /// Insert a new module <mod> below the named module <prev_name>.
  virtual int insert (const ACE_TCHAR *prev_name,
                      ACE_Module<ACE_SYNCH_USE> *mod);

  /// Replace the named module <replace_name> with a new module <mod>.
  virtual int replace (const ACE_TCHAR *replace_name,
                       ACE_Module<ACE_SYNCH_USE> *mod,
                       int flags = M_DELETE);

  /// Remove the named module <mod> from the stream.  This bypasses the
  /// strict LIFO ordering of <push> and <pop>.
  virtual int remove (const ACE_TCHAR *mod,
                      int flags = M_DELETE);

  /// Return current stream head.
  virtual ACE_Module<ACE_SYNCH_USE> *head (void);

  /// Return current stream tail.
  virtual ACE_Module<ACE_SYNCH_USE> *tail (void);

  /// Find a particular ACE_Module.
  virtual ACE_Module<ACE_SYNCH_USE> *find (const ACE_TCHAR *mod);

  /// Create a pipe between two Streams.
  virtual int link (ACE_Stream<ACE_SYNCH_USE> &);

  /// Remove a pipe formed between two Streams.
  virtual int unlink (void);

  // = Blocking data transfer operations
  /**
   * Send the message <mb> down the stream, starting at the Module
   * below the Stream head.  Wait for upto <timeout> amount of
   * absolute time for the operation to complete (or block forever if
   * <timeout> == 0).
   */
  virtual int put (ACE_Message_Block *mb,
                   ACE_Time_Value *timeout = 0);

  /**
   * Read the message <mb> that is stored in the stream head.
   * Wait for upto <timeout> amount of absolute time for the operation
   * to complete (or block forever if <timeout> == 0).
   */
  virtual int get (ACE_Message_Block *&mb,
                   ACE_Time_Value *timeout = 0);

  /// Send control message down the stream.
  virtual int control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
                       void *args);

  /// Synchronize with the final close of the stream.
  virtual int wait (void);

  /// Dump the state of an object.
  virtual void dump (void) const;

  /// Declare the dynamic allocation hooks.
  ACE_ALLOC_HOOK_DECLARE;

private:
  /// Actually perform the unlinking of two Streams (must be called
  /// with locks held).
  int unlink_i (void);

  /// Actually perform the linking of two Streams (must be called with
  /// locks held).
  int link_i (ACE_Stream<ACE_SYNCH_USE> &);

  /// Must a new module onto the Stream.
  int push_module (ACE_Module<ACE_SYNCH_USE> *,
                   ACE_Module<ACE_SYNCH_USE> * = 0,
                   ACE_Module<ACE_SYNCH_USE> * = 0);

  /// Pointer to the head of the stream.
  ACE_Module<ACE_SYNCH_USE> *stream_head_;

  /// Pointer to the tail of the stream.
  ACE_Module<ACE_SYNCH_USE> *stream_tail_;

  /// Pointer to an adjoining linked stream.
  ACE_Stream<ACE_SYNCH_USE> *linked_us_;

  // = Synchronization objects used for thread-safe streams.
  /// Protect the stream against race conditions.
  ACE_SYNCH_MUTEX_T lock_;

  /// Use to tell all threads waiting on the close that we are done.
  ACE_SYNCH_CONDITION_T final_close_;
};

/**
 * @class ACE_Stream_Iterator
 *
 * @brief Iterate through an <ACE_Stream>.
 */
template <ACE_SYNCH_DECL>
class ACE_Stream_Iterator
{
public:
  // = Initialization method.
  ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr);

  // = Iteration methods.

  /// Pass back the <next_item> that hasn't been seen in the set.
  /// Returns 0 when all items have been seen, else 1.
  int next (const ACE_Module<ACE_SYNCH_USE> *&next_item);

  /// Returns 1 when all items have been seen, else 0.
  int done (void) const;

  /// Move forward by one element in the set.  Returns 0 when all the
  /// items in the set have been seen, else 1.
  int advance (void);

private:
  /// Next <Module> that we haven't yet seen.
  ACE_Module<ACE_SYNCH_USE> *next_;
};

#if defined (__ACE_INLINE__)
#include "ace/Stream.i"
#endif /* __ACE_INLINE__ */

#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "ace/Stream.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */

#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
#pragma implementation ("Stream.cpp")
#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */

#include "ace/post.h"

#endif /* ACE_STREAM_H */