summaryrefslogtreecommitdiff
path: root/ace/Stream.h
blob: 81d073ae848312ffa262826c3ea3283f63d22ed9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/* -*- C++ -*- */
// $Id$

// ============================================================================
//
// = LIBRARY
//    ace
//
// = FILENAME
//    Stream.h
//
// = AUTHOR
//    Doug Schmidt <schmidt@uci.edu>
//
// ============================================================================

#ifndef ACE_STREAM_H
#define ACE_STREAM_H
#include "ace/pre.h"

#include "ace/ACE.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/IO_Cntl_Msg.h"
#include "ace/Message_Block.h"
#include "ace/Time_Value.h"
#include "ace/Module.h"

// Forward decls.
template<ACE_SYNCH_DECL> class ACE_Stream_Iterator;

template <ACE_SYNCH_DECL>
class ACE_Stream
{
  // = TITLE
  //    This class is the primary abstraction for the ASX framework.
  //    It is moduled after System V Stream.
  //
  // = DESCRIPTION
  //    A Stream consists of a stack of <ACE_Modules>, each of which
  //    contains two <ACE_Tasks>.  Even though the methods in this
  //    class are virtual, this class isn't really intended for
  //    subclassing unless you know what you are doing.  In
  //    particular, the <ACE_Stream> destructor calls <close>, which
  //    won't be overridden properly unless you call it in a subclass
  //    destructor.
public:
  friend class ACE_Stream_Iterator<ACE_SYNCH_USE>;

  enum
  {
    M_DELETE = 3
    // Indicates that <close> deletes the Tasks.  Don't change this
    // value without updating the same enum in class ACE_Module...
  };

  // = Initializatation and termination methods.
  ACE_Stream (void *arg = 0,
              ACE_Module<ACE_SYNCH_USE> *head = 0,
              ACE_Module<ACE_SYNCH_USE> *tail = 0);
  // Create a Stream consisting of <head> and <tail> as the Stream
  // head and Stream tail, respectively.  If these are 0 then the
  // <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
  // <arg> is the value past in to the <open> methods of the tasks.

  virtual int open (void *arg,
                    ACE_Module<ACE_SYNCH_USE> *head = 0,
                    ACE_Module<ACE_SYNCH_USE> *tail = 0);
  // Create a Stream consisting of <head> and <tail> as the Stream
  // head and Stream tail, respectively.  If these are 0 then the
  // <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
  // <arg> is the value past in to the <open> methods of the tasks.

  virtual int close (int flags = M_DELETE);
  // Close down the stream and release all the resources.

  virtual ~ACE_Stream (void);
  // Close down the stream and release all the resources.

  // = ACE_Stream plumbing operations

  virtual int push (ACE_Module<ACE_SYNCH_USE> *mod);
  // Add a new module <mod> right below the Stream head.

  virtual int pop (int flags = M_DELETE);
  // Remove the <mod> right below the Stream head and close it down.

  virtual int top (ACE_Module<ACE_SYNCH_USE> *&mod);
  // Return the top module on the stream (right below the stream
  // head).

  virtual int insert (const ACE_TCHAR *prev_name,
                      ACE_Module<ACE_SYNCH_USE> *mod);
  // Insert a new module <mod> below the named module <prev_name>.

  virtual int replace (const ACE_TCHAR *replace_name,
                       ACE_Module<ACE_SYNCH_USE> *mod,
                       int flags = M_DELETE);
  // Replace the named module <replace_name> with a new module <mod>.

  virtual int remove (const ACE_TCHAR *mod,
                      int flags = M_DELETE);
  // Remove the named module <mod> from the stream.  This bypasses the
  // strict LIFO ordering of <push> and <pop>.

  virtual ACE_Module<ACE_SYNCH_USE> *head (void);
  // Return current stream head.

  virtual ACE_Module<ACE_SYNCH_USE> *tail (void);
  // Return current stream tail.

  virtual ACE_Module<ACE_SYNCH_USE> *find (const ACE_TCHAR *mod);
  // Find a particular ACE_Module.

  virtual int link (ACE_Stream<ACE_SYNCH_USE> &);
  // Create a pipe between two Streams.

  virtual int unlink (void);
  // Remove a pipe formed between two Streams.

  // = Blocking data transfer operations
  virtual int put (ACE_Message_Block *mb,
                   ACE_Time_Value *timeout = 0);
  // Send the message <mb> down the stream, starting at the Module
  // below the Stream head.  Wait for upto <timeout> amount of
  // absolute time for the operation to complete (or block forever if
  // <timeout> == 0).

  virtual int get (ACE_Message_Block *&mb,
                   ACE_Time_Value *timeout = 0);
  // Read the message <mb> that is stored in the the stream head.
  // Wait for upto <timeout> amount of absolute time for the operation
  // to complete (or block forever if <timeout> == 0).

  virtual int control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
                       void *args);
  // Send control message down the stream.

  virtual int wait (void);
  // Synchronize with the final close of the stream.

  virtual void dump (void) const;
  // Dump the state of an object.

  ACE_ALLOC_HOOK_DECLARE;
  // Declare the dynamic allocation hooks.

private:
  int unlink_i (void);
  // Actually perform the unlinking of two Streams (must be called
  // with locks held).

  int link_i (ACE_Stream<ACE_SYNCH_USE> &);
  // Actually perform the linking of two Streams (must be called with
  // locks held).

  int push_module (ACE_Module<ACE_SYNCH_USE> *,
                   ACE_Module<ACE_SYNCH_USE> * = 0,
                   ACE_Module<ACE_SYNCH_USE> * = 0);
  // Must a new module onto the Stream.

  ACE_Module<ACE_SYNCH_USE> *stream_head_;
  // Pointer to the head of the stream.

  ACE_Module<ACE_SYNCH_USE> *stream_tail_;
  // Pointer to the tail of the stream.

  ACE_Stream<ACE_SYNCH_USE> *linked_us_;
  // Pointer to an adjoining linked stream.

  // = Synchronization objects used for thread-safe streams.
  ACE_SYNCH_MUTEX_T lock_;
  // Protect the stream against race conditions.

  ACE_SYNCH_CONDITION_T final_close_;
  // Use to tell all threads waiting on the close that we are done.
};

template <ACE_SYNCH_DECL>
class ACE_Stream_Iterator
{
  // = TITLE
  //     Iterate through an <ACE_Stream>.
public:
  // = Initialization method.
  ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr);

  // = Iteration methods.

  int next (const ACE_Module<ACE_SYNCH_USE> *&next_item);
  // Pass back the <next_item> that hasn't been seen in the set.
  // Returns 0 when all items have been seen, else 1.

  int done (void) const;
  // Returns 1 when all items have been seen, else 0.

  int advance (void);
  // Move forward by one element in the set.  Returns 0 when all the
  // items in the set have been seen, else 1.

private:
  ACE_Module<ACE_SYNCH_USE> *next_;
  // Next <Module> that we haven't yet seen.
};

#if defined (__ACE_INLINE__)
#include "ace/Stream.i"
#endif /* __ACE_INLINE__ */

#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "ace/Stream.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */

#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
#pragma implementation ("Stream.cpp")
#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */

#include "ace/post.h"
#endif /* ACE_STREAM_H */