From 374261f97c6d94545689172b4a1c29832f5b6ae5 Mon Sep 17 00:00:00 2001 From: schmidt Date: Tue, 19 Aug 2003 15:08:26 +0000 Subject: ChangeLogTag:Tue Aug 19 10:07:47 2003 Douglas C. Schmidt --- docs/tutorials/015/Client_i.cpp | 2 +- docs/tutorials/015/Compressor.cpp | 103 ++++++++----------------- docs/tutorials/015/Compressor.h | 39 +++++----- docs/tutorials/015/Handler.cpp | 6 +- docs/tutorials/015/Makefile.client | 4 +- docs/tutorials/015/Makefile.server | 5 +- docs/tutorials/015/Protocol_Stream.cpp | 135 ++++++++++++++++++++++++++------- docs/tutorials/015/Protocol_Stream.h | 35 ++++++++- docs/tutorials/015/Protocol_Task.cpp | 3 +- docs/tutorials/015/Xmit.cpp | 7 +- docs/tutorials/015/client.cpp | 49 +++++++----- docs/tutorials/015/page01.html | 3 +- 12 files changed, 239 insertions(+), 152 deletions(-) (limited to 'docs') diff --git a/docs/tutorials/015/Client_i.cpp b/docs/tutorials/015/Client_i.cpp index 6415fed1966..5039fde992e 100644 --- a/docs/tutorials/015/Client_i.cpp +++ b/docs/tutorials/015/Client_i.cpp @@ -39,7 +39,7 @@ int Client::open( void ) // to ensure that our data is in the correct format when // received by the server. Thus, we open the stream and // transfer ownership of the peer. - return stream().open( peer() ); + return stream().open( peer(), 0, true); } // The remainder of the functions just delegate to the stream. diff --git a/docs/tutorials/015/Compressor.cpp b/docs/tutorials/015/Compressor.cpp index bbe76eb1a69..1cb1bf8bbf9 100644 --- a/docs/tutorials/015/Compressor.cpp +++ b/docs/tutorials/015/Compressor.cpp @@ -1,101 +1,60 @@ - // $Id$ +#include "ZlibCompressor.h" #include "Compressor.h" #include "ace/SOCK_Stream.h" -Compressor::Compressor( void ) - : Protocol_Task() +#include + +Compressor::Compressor(AlgorithmCode algorithm) { - ; + switch(algorithm) + { + case COMPRESSION_ZLIB: + compressor_ = new ZlibCompressor(); + break; + + default: + compressor_ = 0; + break; + } } Compressor::~Compressor(void) { - ; + delete compressor_; } -/* This is where you insert your compression code. Most compressors - want to work on a block of data instead of a byte-stream. - Fortunately the message block has a block that can be compressed. - Take a look at libz for a quick way to add compression to your - apps - */ -int Compressor::send(ACE_Message_Block *message, ACE_Time_Value *timeout) +int Compressor::send (ACE_Message_Block *message, ACE_Time_Value *timeout) { - ACE_UNUSED_ARG(message); - ACE_UNUSED_ARG(timeout); - - ACE_DEBUG ((LM_INFO, "(%P|%t) Compressor::send() compressing (%s)\n", message->rd_ptr() )); - - // Create a block to hold the compressed data. I belive libz - // recommends a buffer about 10-20% larger than the source. - // Other libraries/algorithms may have their own quirks. - ACE_Message_Block * compressed = new ACE_Message_Block( - message->size() +16 ); + if (compressor_ == 0) + return -1; - // Perform a bogus compression algorithm. 'CD' just tells me - // that this is compressed data and when we "decompress" we'll - // look for this signature to validate the data received. - ACE_OS::sprintf( compressed->wr_ptr(), "CD:%s", message->rd_ptr() ); - compressed->wr_ptr( strlen(compressed->wr_ptr())+1 ); + ACE_Message_Block* compressed = 0; + int retval = compressor_->send(message, timeout, compressed); - // Send the compressed data down the stream to the next module + // Send the compressed data down the stream to the next module this->put_next( compressed ); - // We're done here. + // We're done here. message->release(); - return( 0 ); + return retval; } -/* And here's the decompression side. We've written Xmit/Recv so that - we're guaranteed to get an entire block of compressed data. If - we'd used recv() in the Recv object then we might have gotten a - partial block and that may not decompress very nicely. - */ -int Compressor::recv(ACE_Message_Block *message, ACE_Time_Value *timeout) +int Compressor::recv (ACE_Message_Block *message, ACE_Time_Value *timeout) { - ACE_UNUSED_ARG(message); - ACE_UNUSED_ARG(timeout); + if (compressor_ == 0) + return -1; - ACE_DEBUG ((LM_INFO, "(%P|%t) Compress::recv() decompressing (%s)\n", message->rd_ptr() )); + ACE_Message_Block* decompressed = 0; + int retval = compressor_->recv(message, timeout, decompressed); - // Room for the decompressed data. In the real world you - // would probably want to send the original (uncompressed) - // data size in the message. You can predict the maximum - // possible decompression size but it's cheap and easy just to - // send that along. Look again at how I do exacly that - // between Xmit and Recv. - ACE_Message_Block * decompressed = new ACE_Message_Block( - message->size() + 16 ); - - // Check for our signature. Even when you use a real - // compression algorithm you may want to include your own - // signature so that you can verify the block. It pays to be - // paranoid! - if( ACE_OS::strncmp( message->rd_ptr(), "CD:", 3 ) ) - { - ACE_DEBUG ((LM_INFO, "(%P|%t) Improperly encompressed data.\n" )); - message->release(); - return(-1); - } - - // Skip past the signature before going any further. - message->rd_ptr( 3 ); - - // Perform a bogus decompression algorithm. This is where you - // would feed to libz or your favorite decompressor. (It's - // costly but you could invoke popen() on gzip!) - ACE_OS::sprintf( decompressed->wr_ptr(), "%s", message->rd_ptr() ); - decompressed->wr_ptr( strlen(decompressed->wr_ptr())+1 ); - - // Recv the decompressed data down the stream to the next module + // Recv the decompressed data down the stream to the next module this->put_next( decompressed ); - // We're done here. + // We're done here. message->release(); - return( 0 ); + return retval; } - diff --git a/docs/tutorials/015/Compressor.h b/docs/tutorials/015/Compressor.h index cb4c7248cce..cf86f0caedc 100644 --- a/docs/tutorials/015/Compressor.h +++ b/docs/tutorials/015/Compressor.h @@ -1,35 +1,40 @@ // $Id$ #ifndef COMPRESSOR_H -#define COMPRESSOR_h +#define COMPRESSOR_H #include "Protocol_Task.h" -/* A reallly dumb compression object. (It actually adds 3 bytes to - every message block.) -*/ +#include "CompressorBase.h" + class Compressor : public Protocol_Task { public: + enum AlgorithmCode + { + COMPRESSION_NONE = 0 + , COMPRESSION_ZLIB = 1 + , COMPRESSION_RLE = 2 // for future expansion + , COMPRESSION_BZIP2 = 3 // for future expansion + }; - typedef Protocol_Task inherited; - - Compressor (void); + Compressor(AlgorithmCode algorithm); - ~Compressor (void); + virtual ~Compressor (void); protected: - // This is called when the compressor is on the downstream side. - // We'll take the message, compress it and move it along to the next - // module. - int send (ACE_Message_Block *message, - ACE_Time_Value *timeout); + // This is called when the compressor is on the + // downstream side. We'll take the message, compress it + // and move it along to the next module. + virtual int send (ACE_Message_Block *message, ACE_Time_Value *timeout); + + // This one is called on the upstream side. No surprise: we + // decompress the data and send it on up the stream. + virtual int recv (ACE_Message_Block *message, ACE_Time_Value *timeout); - // This one is called on the upstream side. No surprise: we - // decompress the data and send it on up the stream. - int recv (ACE_Message_Block *message, - ACE_Time_Value *timeout); +private: + CompressorBase* compressor_; }; #endif /* COMPRESSOR_H */ diff --git a/docs/tutorials/015/Handler.cpp b/docs/tutorials/015/Handler.cpp index e940a8a8ce7..201d3605c41 100644 --- a/docs/tutorials/015/Handler.cpp +++ b/docs/tutorials/015/Handler.cpp @@ -9,7 +9,7 @@ more when we talk about the stream in detail. For now it's enough to know that Handler_Task::recv() will be invoked by the stream after data from the client has been received and processed (eg -- - decrypted, uncompressed, and whatever else the protocol requires.) + uncompressed and whatever else the protocol requires.) */ class Handler_Task : public Protocol_Task { @@ -66,7 +66,7 @@ int Handler::open (void *) // read client requests and send our responses. We also // provide a Handler_Task instance that will ultimately be // responsible for processing any client data we receive. - int rval = stream().open( this->peer(), new Handler_Task() ); + int rval = stream().open( this->peer(), new Handler_Task(), false ); // Of course, we have to account for the chance that the // stream's open() may fail. @@ -161,7 +161,7 @@ int Handler_Task::recv(ACE_Message_Block * message, ACE_DEBUG ((LM_INFO, "(%P|%t) Handler_Task::recv() got (%s)\n", message->rd_ptr() )); // Create a response message to send to the client - ACE_Message_Block * response = new ACE_Message_Block( 128 ); + ACE_Message_Block * response = new ACE_Message_Block( 1024 ); // Nothing very original about this I'm afraid... ACE_OS::sprintf( response->wr_ptr(), "You Said: (%s)", message->rd_ptr() ); diff --git a/docs/tutorials/015/Makefile.client b/docs/tutorials/015/Makefile.client index 20680aea15b..440b813cfb3 100644 --- a/docs/tutorials/015/Makefile.client +++ b/docs/tutorials/015/Makefile.client @@ -11,8 +11,10 @@ FILES += Protocol_Stream FILES += Protocol_Task FILES += Xmit FILES += Recv +FILES += CompressionSocket FILES += Compressor -FILES += Crypt +FILES += CompressorBase +FILES += ZlibCompressor FILES += Client_i BUILD = $(VBIN) diff --git a/docs/tutorials/015/Makefile.server b/docs/tutorials/015/Makefile.server index 109cecc8e90..bc44c1d3867 100644 --- a/docs/tutorials/015/Makefile.server +++ b/docs/tutorials/015/Makefile.server @@ -11,9 +11,10 @@ FILES += Protocol_Stream FILES += Protocol_Task FILES += Xmit FILES += Recv +FILES += CompressionSocket FILES += Compressor -FILES += Crypt - +FILES += CompressorBase +FILES += ZlibCompressor FILES += Handler FILES += Server_i diff --git a/docs/tutorials/015/Protocol_Stream.cpp b/docs/tutorials/015/Protocol_Stream.cpp index e1c210ec119..6b7fd05c3cc 100644 --- a/docs/tutorials/015/Protocol_Stream.cpp +++ b/docs/tutorials/015/Protocol_Stream.cpp @@ -7,15 +7,12 @@ #include "Recv.h" #include "Compressor.h" -#include "Crypt.h" #include "ace/Stream_Modules.h" /* You can choose at compile time to include/exclude the protocol pieces. */ -#define ENABLE_COMPRESSION -#define ENABLE_ENCRYPTION // The usual typedefs to make things easier to type. typedef ACE_Module Module; @@ -49,7 +46,8 @@ Protocol_Stream::~Protocol_Stream (void) */ int Protocol_Stream::open (ACE_SOCK_Stream &peer, - Protocol_Task *reader) + Protocol_Task *reader, + bool isOriginator) { // Initialize our peer() to read/write the socket we're given peer_.set_handle (peer.get_handle ()); @@ -73,7 +71,7 @@ Protocol_Stream::open (ACE_SOCK_Stream &peer, // Add any other protocol tasks to the stream. Each one is added at // the head. The net result is that Xmit/Recv are at the tail. - if (this->open () == -1) + if (this->open (isOriginator) == -1) return -1; // If a reader task was provided then push that in as the upstream @@ -96,32 +94,115 @@ Protocol_Stream::open (ACE_SOCK_Stream &peer, return 0; } -/* Add the necessary protocol objects to the stream. The way we're - pushing things on we will compress the data before encrypting it. +/* Add the necessary protocol objects to the stream. */ + int -Protocol_Stream::open (void) +Protocol_Stream::open (bool isOriginator) { -#if defined (ENABLE_ENCRYPTION) - if (stream ().push (new Module ("crypt", - new Crypt (), - new Crypt ())) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "stream().push(crypt)"), - -1); -#endif /* ENABLE_ENCRYPTION */ + // We are protocol version 1, we only support ZLIB compression currently. -#if defined (ENABLE_COMPRESSION) - if (stream ().push (new Module ("compress", - new Compressor (), - new Compressor ())) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "stream().push(comprssor)"), - -1); -#endif /* ENABLE_COMPRESSION */ - return 0; + ProtocolVersion pversion = PROTOCOL_VERSION_1; + Compressor::AlgorithmCode acode = Compressor::COMPRESSION_ZLIB; + + // If we are the originator, we put a message asking for + // what compression is to be used. + + if (isOriginator) + { + ACE_Message_Block * message = new ACE_Message_Block( 1024 ); + + CompressionNegotiation neg; + neg.protocolVersion = pversion; + neg.algorithmCount = 1; + neg.algorithm[0] = acode; + + memcpy(message->wr_ptr(), &neg, sizeof(neg)); + message->wr_ptr(sizeof(neg)); + + if (put(message, 0) < 0) + { + return -1; + } + + ACE_Message_Block * response; + if (get(response, 0) < 0) + { + return -1; + } + + CompressionNegotiationReply negResponse; + memcpy(&negResponse, response->rd_ptr(), sizeof(negResponse)); + + response->release(); + if (negResponse.errorCode != NEGOTIATE_OK) + { + return -1; + } + } + else + { + // Read the message to see what algorithm the originator speaks. + + ACE_Message_Block* message; + if (get(message, 0) < 0) + { + return -1; + } + + CompressionNegotiation neg; + memcpy(&neg, message->rd_ptr(), sizeof(neg)); + message->release(); + + // Reply to handshake message. + + CompressionNegotiationReply negResponse; + negResponse.errorCode = NEGOTIATE_OK; + negResponse.protocolVersion = PROTOCOL_VERSION_1; + + if (neg.protocolVersion == PROTOCOL_VERSION_1) + { + // Ensure that incoming algorithms include the one we support. + + negResponse.algorithm = Compressor::COMPRESSION_NONE; + for (int i = 0; i < neg.algorithmCount; i++) + { + if (neg.algorithm[i] == acode) + { + negResponse.algorithm = acode; + break; + } + } + if (negResponse.algorithm == Compressor::COMPRESSION_NONE) + { + negResponse.errorCode = NEGOTIATE_UNSUPPORTED_ALGORITHM; + } + } + else + { + negResponse.errorCode = NEGOTIATE_UNSUPPORTED_VERSION; + } + + ACE_Message_Block* reply = new ACE_Message_Block( 1024 ); + memcpy(reply->wr_ptr(), &negResponse, sizeof(negResponse)); + reply->wr_ptr(sizeof(neg)); + + if (put(reply, 0) < 0) + { + return -1; + } + } + + // Add the compression component to the pipeline. + + if (stream ().push (new Module ("compress", + new Compressor (acode), + new Compressor (acode))) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "stream().push(compressor)"), + -1); + return 0; } // Closing the Protocol_Stream is as simple as closing the ACE_Stream. diff --git a/docs/tutorials/015/Protocol_Stream.h b/docs/tutorials/015/Protocol_Stream.h index 5209ed68907..f846f9efb54 100644 --- a/docs/tutorials/015/Protocol_Stream.h +++ b/docs/tutorials/015/Protocol_Stream.h @@ -33,7 +33,8 @@ public: // reader task just below the stream head so that it can process // data read from the peer. int open (ACE_SOCK_Stream &peer, - Protocol_Task *reader = 0); + Protocol_Task *reader, + bool isOriginator); // Close the stream. All of the tasks & modules will also be // closed. @@ -64,6 +65,32 @@ public: } private: + enum ProtocolVersion + { + PROTOCOL_VERSION_1 = 1 + }; + + enum CompressionNegotiationErrorCode + { + NEGOTIATE_OK = 0 + , NEGOTIATE_UNSUPPORTED_VERSION = -1 + , NEGOTIATE_UNSUPPORTED_ALGORITHM = -2 + }; + + struct CompressionNegotiation + { + unsigned char protocolVersion; + unsigned char algorithmCount; + unsigned char algorithm[5]; // as of protocol version 1. + }; + + struct CompressionNegotiationReply + { + signed char errorCode; + unsigned char protocolVersion; + unsigned char algorithm; + }; + // Our peer connection ACE_SOCK_Stream peer_; @@ -71,16 +98,18 @@ private: Stream stream_; // A task which is capable of receiving data on a socket. - // Note that this is only useful by server-side applications. + // Note that this is only useful by client-side applications. Recv *recv_; + bool isHandshakeComplete_; + Stream &stream (void) { return this->stream_; } // Install the protocol tasks into the stream. - int open (void); + int open (bool isOriginator); }; #endif /* PROTOCOL_STREAM_H */ diff --git a/docs/tutorials/015/Protocol_Task.cpp b/docs/tutorials/015/Protocol_Task.cpp index 6060fb1ddfa..da202a50b46 100644 --- a/docs/tutorials/015/Protocol_Task.cpp +++ b/docs/tutorials/015/Protocol_Task.cpp @@ -46,7 +46,8 @@ int Protocol_Task::svc(void) we're moving data upstream or downstream and invoke the appropriate virtual function to handle it. */ -int Protocol_Task::process(ACE_Message_Block * message, ACE_Time_Value *timeout) +int Protocol_Task::process(ACE_Message_Block * message, + ACE_Time_Value *timeout) { if( this->is_writer() ) { diff --git a/docs/tutorials/015/Xmit.cpp b/docs/tutorials/015/Xmit.cpp index 28be01b1d25..4f2ed4f274f 100644 --- a/docs/tutorials/015/Xmit.cpp +++ b/docs/tutorials/015/Xmit.cpp @@ -47,10 +47,10 @@ int Xmit::send(ACE_Message_Block *message, ACE_Time_Value *timeout) { int rval; - ACE_DEBUG ((LM_INFO, "(%P|%t) Xmit::send() sending (%s)(%d)\n", message->rd_ptr(), message->length() )); + ACE_DEBUG ((LM_INFO, "(%P|%t) Xmit::send() sending buff of len %d\n", message->length() )); /* Since we're going to be sending data that may have been - compressed and encrypted it's probably important for the + compressed it's probably important for the receiver to get an entire "block" instead of having a partial read. @@ -75,7 +75,8 @@ int Xmit::send(ACE_Message_Block *message, ACE_Time_Value *timeout) containing msize and the message data and send it all at once. */ - rval = this->peer().send_n( message->rd_ptr(), message->length(), 0, timeout ); + rval = this->peer().send_n(message->rd_ptr(), + message->length(), 0, timeout); // Release the message block since we're done with it. message->release(); diff --git a/docs/tutorials/015/client.cpp b/docs/tutorials/015/client.cpp index 93041dcdfdd..af4433d4a23 100644 --- a/docs/tutorials/015/client.cpp +++ b/docs/tutorials/015/client.cpp @@ -8,42 +8,51 @@ int main(int argc, char *argv[]) { - // How many messages will we send? + // How many messages will we send? int mcount = argc > 1 ? ACE_OS::atoi(argv[1]) : 3; - // Construct a Client with our desired endpoint. + // Construct a Client with our desired endpoint. Client client(ACE_DEFAULT_SERVER_PORT,ACE_DEFAULT_SERVER_HOST); - // Attempt to open the connection to the server. + // Attempt to open the connection to the server. if( client.open() == -1 ) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Client::open()"), -1); } - // Send a few messages to the server and get some responses... + // Send a few messages to the server and get some responses... for( int i = 0 ; i < mcount ; ++i ) { - // Since we'll be using a Protocol Stream (even though we - // don't know that at this level) we require the use of - // ACE_Message_Block objects to send/receive data. - ACE_Message_Block * message = new ACE_Message_Block( 128 ); - - // Construct a silly message to send to the server. - // Notice that we're careful to add one to the strlen() so - // that we also send the end-of-string NULL character. - ACE_OS::sprintf (message->wr_ptr (), "This is message %d.", i); + // Since we'll be using a Protocol Stream (even though we + // don't know that at this level) we require the use of + // ACE_Message_Block objects to send/receive data. + ACE_Message_Block * message = new ACE_Message_Block( 1024 ); + + // Construct a silly message to send to the server. + // Notice that we're careful to add one to the strlen() so + // that we also send the end-of-string NULL character. + ACE_OS::sprintf (message->wr_ptr (), + "This is message %d. It is a bit verbose because " + "we want there to be an opportunity for some compression to occur. " + "That's why it has a whofflly style and goes on and on and on.", + i); message->wr_ptr (strlen (message->rd_ptr ())+1); - // client will take ownership of the message block so that - // we don't have to remember to release(). We *do* have - // to remember not to use it after put() since it may be - // released almost immediately. - client.put( message ); + // client will take ownership of the message block so that + // we don't have to remember to release(). We *do* have + // to remember not to use it after put() since it may be + // released almost immediately. + if (client.put( message ) < 0) + { + fprintf(stderr, "%s:%d: client put failed.\n", __FILE__, __LINE__); + return -1; + } ACE_Message_Block * response; - // get() takes an ACE_Message_Block pointer reference. We then - // assume ownership of it and must release() when we're done. + // get() takes an ACE_Message_Block pointer reference. We then + // assume ownership of it and must release() when we're done. + if( client.get( response ) == -1 ) { ACE_DEBUG ((LM_INFO, "(%P|%t) Failed to get response from server\n" )); diff --git a/docs/tutorials/015/page01.html b/docs/tutorials/015/page01.html index 0709a089c19..7d66f966e77 100644 --- a/docs/tutorials/015/page01.html +++ b/docs/tutorials/015/page01.html @@ -98,8 +98,7 @@ now from the previous tutorials. etc.)

-* Ok, I didn't really implement encryption and - compression objects. I'll leave that as a thought +* Ok, I didn't really implement encryption objects. I'll leave that as a thought exercise!


[Tutorial Index] [Continue This Tutorial]
-- cgit v1.2.1