summaryrefslogtreecommitdiff
path: root/docs/tutorials/015/Protocol_Stream.cpp
blob: 058ec300b2df0fa3711cfb172cbcac33751ad04c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

// $Id$

#include "Protocol_Stream.h"
#include "Protocol_Task.h"

#include "Xmit.h"
#include "Recv.h"

#include "Compressor.h"
#include "Crypt.h"

#include "ace/Stream_Modules.h"

/* You can choose at compile time to include/exclude the protocol
   pieces.
*/
#define ENABLE_COMPRESSION
#define ENABLE_ENCRYPTION

// The usual typedefs to make things easier to type.
typedef ACE_Module<ACE_MT_SYNCH> Module;
typedef ACE_Thru_Task<ACE_MT_SYNCH> Thru_Task;

/* Do-nothing constructor and destructor
 */
  
Protocol_Stream::Protocol_Stream( void )
{
    ;
}

Protocol_Stream::~Protocol_Stream( void )
{
    ;
}

/* Even opening the stream is rather simple.  The important thing to
   rememer is that the modules you push onto the stream first will be
   at the tail (eg -- most downstream) end of things when you're
   done.
 */
int Protocol_Stream::open( ACE_SOCK_Stream & _peer, Protocol_Task * _reader )
{
        // Initialize our peer() to read/write the socket we're given
    peer_.set_handle( _peer.get_handle() );

        // Construct (and remember) the Recv object so that we can
        // read from the peer().
    recv_ = new Recv( peer() );

        // Add the transmit and receive tasks to the head of the
        // stream.  As we add more modules these will get pushed
        // downstream and end up nearest the tail by the time we're
        // done.
    if( stream().push( new Module( "Xmit/Recv", new Xmit( peer() ), recv_ ) ) == -1 )
    {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "stream().push( xmit/recv )"), -1);
    }

        // Add any other protocol tasks to the stream.  Each one is
        // added at the head.  The net result is that Xmit/Recv are at 
        // the tail.
    if( this->open() == -1 )
    {
        return(-1);
    }

        // If a reader task was provided then push that in as the
        // upstream side of the next-to-head module.  Any data read
        // from the peer() will be sent through here last.  Server
        // applications will typically use this task to do the actual
        // processing of data.
        // Note the use of Thru_Task.  Since a module must always have 
        // a pair of tasks we use this on the writter side as a no-op.
    if( _reader )
    {
        if( stream().push( new Module( "Reader", new Thru_Task(), _reader ) ) == -1 )
        {
            ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "stream().push( reader )"), -1);
        }
    }

    return(0);
}

/* Add the necessary protocol objects to the stream.  The way we're
   pushing things on we will encrypt the data before compressing it.
*/
int Protocol_Stream::open(void)
{
#if defined(ENABLE_COMPRESSION)
    if( stream().push( new Module( "compress", new Compressor(), new Compressor() ) ) == -1 )
    {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "stream().push( comprssor )"), -1);
    }
#endif // ENABLE_COMPRESSION
    
#if defined(ENABLE_ENCRYPTION)
    if( stream().push( new Module( "crypt", new Crypt(), new Crypt() ) ) == -1 )
    {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "stream().push( crypt )"), -1);
    }
#endif // ENABLE_ENCRYPTION
    return( 0 );
}

// Closing the Protocol_Stream is as simple as closing the ACE_Stream.
int Protocol_Stream::close(void)
{
    return stream().close();
}

// Simply pass the data directly to the ACE_Stream.
int Protocol_Stream::put(ACE_Message_Block * & _message, ACE_Time_Value * _timeout )
{
    return stream().put(_message,_timeout);
}

/* Tell the Recv module to read some data from the peer and pass it
   upstream.  Servers will typically use this method in a
   handle_input() method to tell the stream to get a client's request.
*/
int Protocol_Stream::get(void)
{
        // If there is no Recv module, we're in big trouble!
    if( ! recv_ )
    {
        ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) No Recv object!\n"), -1);
    }

        // This tells the Recv module to go to it's peer() and read
        // some data.  Once read, that data will be pushed upstream.
        // If there is a reader object then it will have a chance to
        // process the data.  If not, the received data will be
        // available in the message queue of the stream head's reader
        // object (eg -- stream().head()->reader()->msg_queue()) and
        // can be read with our other get() method below.
    if( recv_->get() == -1 )
    {
        ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Cannot queue read request\n"), -1);
    }

        // For flexibility I've added an error() method to tell us if
        // something bad has happened to the Recv object.
    if( recv_->error() )
    {
        ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Recv object error!\n"), -1);
    }

    return(0);
}

/* Take a message block off of the stream head reader's message
   queue.  If the queue is empty, use get() to read from the peer.
   This is most often used by client applications.  Servers will
   generaly insert a reader that will prevent the data from getting
   all the way upstream to the head.
*/
int Protocol_Stream::get(ACE_Message_Block * & _response, ACE_Time_Value * _timeout )
{
    if( stream().head()->reader()->msg_queue()->is_empty() )
    {
        if( this->get() == -1 )
        {
            ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Cannot get data into the stream.\n"), -1);
        }
    }
    
    return stream().head()->reader()->getq(_response,_timeout);
}