diff options
author | Chris Cleeland <chris.cleeland@gmail.com> | 1997-04-10 17:10:54 +0000 |
---|---|---|
committer | Chris Cleeland <chris.cleeland@gmail.com> | 1997-04-10 17:10:54 +0000 |
commit | bc5918ede1822e9a52d283ad03aa3fbe241ba7f3 (patch) | |
tree | c28f4c0c84145d3d178a0787c1c995beb57bf83e /TAO/IIOP | |
parent | f4b3e980c7f48a655e18330666ef2d3c585e5b61 (diff) | |
download | ATCD-bc5918ede1822e9a52d283ad03aa3fbe241ba7f3.tar.gz |
Progress towards concurrency. Non-stable checkin necessary to test on
other platforms.
Diffstat (limited to 'TAO/IIOP')
-rw-r--r-- | TAO/IIOP/lib/connmgr.hh | 12 | ||||
-rw-r--r-- | TAO/IIOP/lib/giop.cpp | 327 | ||||
-rw-r--r-- | TAO/IIOP/lib/giop.hh | 47 | ||||
-rw-r--r-- | TAO/IIOP/lib/orbconf.hh | 3 | ||||
-rw-r--r-- | TAO/IIOP/lib/roa.cpp | 4 | ||||
-rw-r--r-- | TAO/IIOP/lib/roa.hh | 2 | ||||
-rw-r--r-- | TAO/IIOP/lib/tcpoa.cpp | 20 | ||||
-rw-r--r-- | TAO/IIOP/lib/tcpoa.hh | 41 |
8 files changed, 239 insertions, 217 deletions
diff --git a/TAO/IIOP/lib/connmgr.hh b/TAO/IIOP/lib/connmgr.hh index fd90b04a69c..1983e39b7a3 100644 --- a/TAO/IIOP/lib/connmgr.hh +++ b/TAO/IIOP/lib/connmgr.hh @@ -30,20 +30,20 @@ // template <class T> class autorelease { - public: - autorelease &operator = (T *ptr) +public: + autorelease& operator= (T* ptr) { if (_state) _state->release(); _state = ptr; return *this; } - operator int () { return _state ? 1 : 0; } - T *operator -> () { return _state; } + operator int () { return _state ? 1 : 0; } + T* operator -> () { return _state; } autorelease () { _state = 0; } ~autorelease () { if (_state) _state->release (); } - private: - T *_state; +private: + T* _state; }; diff --git a/TAO/IIOP/lib/giop.cpp b/TAO/IIOP/lib/giop.cpp index a4a879a4809..b873723e4ce 100644 --- a/TAO/IIOP/lib/giop.cpp +++ b/TAO/IIOP/lib/giop.cpp @@ -43,6 +43,8 @@ #include <stdlib.h> #include <string.h> +#include <ace/SOCK_Stream.h> + #if unix # include <sys/types.h> # include <sys/socket.h> @@ -140,10 +142,15 @@ dump_msg (const char *label, const unsigned char *ptr, size_t len) #endif // !DEBUG CORBA_Boolean -GIOP::send_message ( - CDR &stream, - ACE_HANDLE &connection -) +GIOP::send_message(CDR& stream, ACE_SOCK_Stream& peer) +{ + int h = peer.get_handle(); + send_message(stream, h); + peer.set_handle(h); +} + +CORBA_Boolean +GIOP::send_message (CDR& stream, ACE_HANDLE& connection) { char *buf = (char *) stream.buffer; size_t buflen = stream.next - stream.buffer; @@ -251,10 +258,15 @@ close_message [GIOP_HDR_LEN] = { void -GIOP::close_connection ( - ACE_HANDLE &fd, - void * // currently unused -) +GIOP::close_connection (ACE_SOCK_Stream& peer, void* unused) +{ + int h = peer.get_handle(); + close_connection(h, unused); + peer.set_handle(h); +} + +void +GIOP::close_connection (ACE_HANDLE& fd, void* unused) { // // It's important that we use a reliable shutdown after we send @@ -296,6 +308,14 @@ send_error (ACE_HANDLE &fd) fd = ACE_INVALID_HANDLE; } +static inline void +send_error(ACE_SOCK_Stream& peer) +{ + int h = peer.get_handle(); + send_error(h); + peer.set_handle(h); +} + // // Loop on data read ... this is required with some implementations of @@ -304,30 +324,31 @@ send_error (ACE_HANDLE &fd) // static int read_buffer ( - ACE_HANDLE fd, - char *buf, - size_t len + ACE_SOCK_Stream& peer, + char* buf, + size_t len ) { - int bytes_read = 0; + int bytes_read = 0; - while (len != 0) { - int retval; + while (len != 0) + { + int retval; - retval = ACE::recv (fd, buf, len); + retval = peer.recv(buf, len); #ifdef DEBUG - dmsg_filter (6, "read %d bytes from connection: %d", retval, fd); + dmsg_filter (6, "read %d bytes from connection: %d", retval, fd); #endif - if (retval <= 0) // EOF or error - return retval; + if (retval <= 0) // EOF or error + return retval; - len -= retval; - buf += retval; - bytes_read += retval; + len -= retval; + buf += retval; + bytes_read += retval; } - return bytes_read; + return bytes_read; } @@ -351,133 +372,137 @@ read_buffer ( // with both optimizations applied? // GIOP::MsgType -GIOP::read_message ( - ACE_HANDLE &connection, - CDR &msg, - CORBA_Environment &env -) +GIOP::read_message (ACE_HANDLE& fd, CDR& msg, CORBA_Environment& env) { - GIOP::MsgType retval; - CORBA_ULong message_size; - - // - // Read the message header off the wire. - // - // THREADING NOTE: the connection manager handed us this connection - // for exclusive use, so we need not worry about having two threads - // interleave reads of partial messages. This model is excellent - // for "lightly threaded" systems (as will be the majority in the - // near future) but makes less effective use of connection resources - // as the "duty factor" goes down because of either long calls or - // bursty contention during numerous short calls to the same server. - // - assert (msg.length > GIOP_HDR_LEN); - - msg.next = msg.buffer; - msg.remaining = GIOP_HDR_LEN; - - char *bufptr = (char _FAR *) msg.buffer; - int len; - - // - // Read the header into the buffer. - // - if ((len = read_buffer (connection, bufptr, GIOP_HDR_LEN)) - != GIOP_HDR_LEN) { - if (len == 0) { // EOF - dmsg1 ("Header EOF ... peer probably aborted connection %d", - connection); - - // - // XXX should probably find some way to report this without - // an exception, since for most servers it's not an error. - // Is it _never_ an error? Not sure ... - // - } else if (len < 0) { // error - dsockerr ("GIOP::read_message header"); - } else { // short read ... - dmsg ("read message header failed (short)"); - } - env.exception (new CORBA_COMM_FAILURE (COMPLETED_MAYBE)); - return MessageError; - } + ACE_SOCK_Stream s; + s.set_handle(fd); + return read_message(s, msg, env); +} - // - // NOTE: if message headers, or whome messages, get encrypted in - // application software (rather than by the network infrastructure) - // they should be decrypted here ... - // +GIOP::MsgType +GIOP::read_message (ACE_SOCK_Stream& connection, CDR& msg, CORBA_Environment& env) +{ + GIOP::MsgType retval; + CORBA_ULong message_size; + + // + // Read the message header off the wire. + // + // THREADING NOTE: the connection manager handed us this connection + // for exclusive use, so we need not worry about having two threads + // interleave reads of partial messages. This model is excellent + // for "lightly threaded" systems (as will be the majority in the + // near future) but makes less effective use of connection resources + // as the "duty factor" goes down because of either long calls or + // bursty contention during numerous short calls to the same server. + // + assert (msg.length > GIOP_HDR_LEN); + + msg.next = msg.buffer; + msg.remaining = GIOP_HDR_LEN; + + char *bufptr = (char _FAR *) msg.buffer; + int len; + + // + // Read the header into the buffer. + // + if ((len = read_buffer (connection, bufptr, GIOP_HDR_LEN)) + != GIOP_HDR_LEN) { + if (len == 0) { // EOF + dmsg1 ("Header EOF ... peer probably aborted connection %d", + connection); - // - // First make sure it's a GIOP message of any version. - // - if (!(msg.buffer [0] == 'G' && msg.buffer [1] == 'I' - && msg.buffer [2] == 'O' && msg.buffer [3] == 'P')) { - env.exception (new CORBA_MARSHAL (COMPLETED_MAYBE)); // header - dmsg ("bad header, magic word"); - return MessageError; + // + // XXX should probably find some way to report this without + // an exception, since for most servers it's not an error. + // Is it _never_ an error? Not sure ... + // + } else if (len < 0) { // error + dsockerr ("GIOP::read_message header"); + } else { // short read ... + dmsg ("read message header failed (short)"); } - - // - // Then make sure the major version is ours, and the minor version is - // one that we understand. - // - if (!(msg.buffer [4] == MY_MAJOR && msg.buffer [5] <= MY_MINOR)) { - env.exception (new CORBA_MARSHAL (COMPLETED_MAYBE)); // header - dmsg ("bad header, version"); - return MessageError; + env.exception (new CORBA_COMM_FAILURE (COMPLETED_MAYBE)); + return MessageError; + } + + // + // NOTE: if message headers, or whome messages, get encrypted in + // application software (rather than by the network infrastructure) + // they should be decrypted here ... + // + + // + // First make sure it's a GIOP message of any version. + // + if (!(msg.buffer [0] == 'G' && msg.buffer [1] == 'I' + && msg.buffer [2] == 'O' && msg.buffer [3] == 'P')) { + env.exception (new CORBA_MARSHAL (COMPLETED_MAYBE)); // header + dmsg ("bad header, magic word"); + return MessageError; + } + + // + // Then make sure the major version is ours, and the minor version is + // one that we understand. + // + if (!(msg.buffer [4] == MY_MAJOR && msg.buffer [5] <= MY_MINOR)) { + env.exception (new CORBA_MARSHAL (COMPLETED_MAYBE)); // header + dmsg ("bad header, version"); + return MessageError; + } + + // + // Get the message type out and adjust the buffer's records to record + // that we've read everything except the length. + // + retval = (GIOP::MsgType) msg.buffer [7]; + msg.skip_bytes (8); + + // + // Make sure byteswapping is done if needed, and then read the message + // size (appropriately byteswapped). + // + msg.do_byteswap = (msg.buffer [6] != MY_BYTE_SEX); + msg.get_ulong (message_size); + + // + // Make sure we have the full length in memory, growing the + // buffer if needed. + // + // NOTE: We could overwrite these few bytes of header... they're + // left around for now as a debugging aid. + // + assert (message_size <= UINT_MAX); + + if ((GIOP_HDR_LEN + message_size) > msg.length) + msg.grow ((size_t) (GIOP_HDR_LEN + message_size)); + + msg.remaining = (size_t) message_size; + bufptr = (char *) & msg.buffer [GIOP_HDR_LEN]; + + // + // Read the rest of this message into the buffer. + // + if ((len = read_buffer (connection, bufptr, (size_t) message_size)) + != (int) message_size) { + if (len == 0) { + dmsg1 ("read message body, EOF on fd %d", connection); + } else if (len < 0) { + dperror ("GIOP::read_message() body"); + } else { + dmsg2 ("short read, only %d of %d bytes", len, message_size); } - // - // Get the message type out and adjust the buffer's records to record - // that we've read everything except the length. - // - retval = (GIOP::MsgType) msg.buffer [7]; - msg.skip_bytes (8); - - // - // Make sure byteswapping is done if needed, and then read the message - // size (appropriately byteswapped). - // - msg.do_byteswap = (msg.buffer [6] != MY_BYTE_SEX); - msg.get_ulong (message_size); - - // - // Make sure we have the full length in memory, growing the - // buffer if needed. - // - // NOTE: We could overwrite these few bytes of header... they're - // left around for now as a debugging aid. - // - assert (message_size <= UINT_MAX); - - if ((GIOP_HDR_LEN + message_size) > msg.length) - msg.grow ((size_t) (GIOP_HDR_LEN + message_size)); - - msg.remaining = (size_t) message_size; - bufptr = (char *) & msg.buffer [GIOP_HDR_LEN]; - - // - // Read the rest of this message into the buffer. - // - if ((len = read_buffer (connection, bufptr, (size_t) message_size)) - != (int) message_size) { - if (len == 0) { - dmsg1 ("read message body, EOF on fd %d", connection); - } else if (len < 0) { - dperror ("GIOP::read_message() body"); - } else { - dmsg2 ("short read, only %d of %d bytes", len, message_size); - } - - // clean up, and ... - env.exception (new CORBA_COMM_FAILURE (COMPLETED_MAYBE)); // body - dmsg ("couldn't read rest of message"); - return MessageError; - } + // clean up, and ... + env.exception (new CORBA_COMM_FAILURE (COMPLETED_MAYBE)); // body + dmsg ("couldn't read rest of message"); + return MessageError; + } - dump_msg ("recv", msg.buffer, (size_t)(message_size + GIOP_HDR_LEN)); - return retval; + dump_msg ("recv", msg.buffer, (size_t)(message_size + GIOP_HDR_LEN)); + return retval; } @@ -743,7 +768,7 @@ GIOP::Invocation::invoke ( // // Send Request, return on error or if we're done // - if (!send_message (stream, endpoint->fd)) { + if (!GIOP::send_message (stream, endpoint->fd)) { // // send_message() closed the connection; we just release it here. // @@ -798,7 +823,7 @@ GIOP::Invocation::invoke ( // all C stack frames must also have their (explicitly coded) handlers // called. We assume a POSIX.1c/C/C++ environment. // - switch (read_message (endpoint->fd, stream, env)) { + switch (GIOP::read_message (endpoint->fd, stream, env)) { case Reply: // handle reply ... must be right one etc break; @@ -1095,7 +1120,7 @@ GIOP::Invocation::invoke ( // void GIOP::incoming_message ( - ACE_HANDLE &fd, + ACE_SOCK_Stream& peer, LocateStatusType check_forward ( opaque &key, CORBA_Object_ptr &objref, @@ -1115,7 +1140,7 @@ GIOP::incoming_message ( unsigned char buffer [CDR::DEFAULT_BUFSIZE]; CDR msg (&buffer [0], sizeof buffer); - switch (read_message (fd, msg, env)) { + switch (read_message (peer, msg, env)) { case Request: { RequestHeader req; @@ -1209,7 +1234,7 @@ GIOP::incoming_message ( CDR::encoder (_tc_CORBA_Object, &fwd_ref, 0, &response, env); CORBA_release (fwd_ref); - (void) send_message (response, fd); + (void) send_message (response, peer); // // ... or report exception that the object doesn't exist. @@ -1221,7 +1246,7 @@ GIOP::incoming_message ( (void) CDR::encoder (_tc_CORBA_OBJECT_NOT_EXIST, &exc, 0, &response, env); - (void) send_message (response, fd); + (void) send_message (response, peer); } delete req.object_key.buffer; @@ -1252,7 +1277,7 @@ GIOP::incoming_message ( // "handle_request" routine puts ReplyStatusType then // parameters. // - (void) send_message (response, fd); + (void) send_message (response, peer); } else handle_request (req, msg, 0, context, env); @@ -1300,7 +1325,7 @@ GIOP::incoming_message ( dmsg ("LocateRequest response: no such object"); } } - (void) send_message (response, fd); + (void) send_message (response, peer); } break; @@ -1345,7 +1370,7 @@ GIOP::incoming_message ( // in case (it'll fail on EOF) and then close the connection. // case MessageError: - send_error (fd); + send_error (peer); break; } diff --git a/TAO/IIOP/lib/giop.hh b/TAO/IIOP/lib/giop.hh index 70431ab9e7b..909cd3196d3 100644 --- a/TAO/IIOP/lib/giop.hh +++ b/TAO/IIOP/lib/giop.hh @@ -21,6 +21,7 @@ #define _GIOP_HH #include <ace/OS.h> +#include <ace/SOCK_Stream.h> #if defined(__IIOP_BUILD) # include "orb.hh" # include "stub.hh" @@ -269,6 +270,7 @@ class GIOP { // namespace // // Close a connection, first sending GIOP::CloseConnection // + static void close_connection (ACE_SOCK_Stream &fd, void *ctx); static void close_connection (ACE_HANDLE &fd, void *ctx); // @@ -287,37 +289,28 @@ class GIOP { // namespace // indicate overal status (LocateStatusType) as well as an objref // in the case of OBJECT_FORWARD. That objref is released. // - static void incoming_message ( - ACE_HANDLE &fd, - LocateStatusType check_forward ( - opaque &key, - CORBA_Object_ptr &objref, - void *context - ), - void handle_request ( - RequestHeader &req, - CDR &request_body, - CDR *reply, - void *context, - CORBA_Environment &env - ), - void *context, - CORBA_Environment &env - ); - - static CORBA_Boolean send_message ( - CDR &stream, - ACE_HANDLE &connection - ); + typedef LocateStatusType (*ForwardFunc)(opaque&,CORBA_Object_ptr&,void*); + typedef void (*RequestHandler)(RequestHeader&,CDR&,CDR*,void*,CORBA_Environment&); + + static void incoming_message(ACE_SOCK_Stream& peer, + ForwardFunc check_forward, + RequestHandler handle_request, + void* context, + CORBA_Environment& env); + static void incoming_message(ACE_HANDLE& fd, + ForwardFunc check_forward, + RequestHandler handle_request, + void* context, + CORBA_Environment& env); + + static CORBA_Boolean send_message (CDR& stream, ACE_SOCK_Stream& peer); + static CORBA_Boolean send_message (CDR& stream, ACE_HANDLE& connection); // // Reads message, returns message type from header // - static MsgType read_message ( - ACE_HANDLE &connection, - CDR &msg, - CORBA_Environment &env - ); + static MsgType read_message (ACE_SOCK_Stream& peer, CDR& msg, CORBA_Environment& env); + static MsgType read_message (ACE_HANDLE& connection, CDR& msg, CORBA_Environment& env); }; #endif // _GIOP_HH diff --git a/TAO/IIOP/lib/orbconf.hh b/TAO/IIOP/lib/orbconf.hh index 8366066d284..8155392a87e 100644 --- a/TAO/IIOP/lib/orbconf.hh +++ b/TAO/IIOP/lib/orbconf.hh @@ -39,7 +39,8 @@ /* Define if your processor stores words with the most significant byte first (like Motorola and SPARC, unlike Intel and VAX). */ #if defined(i386) || defined(_M_IX86) || defined(vax) -# define WORDS_BIGENDIAN 1 +#else +# define WORDS_BIGENDIAN 1 #endif /* The number of bytes in an int. */ diff --git a/TAO/IIOP/lib/roa.cpp b/TAO/IIOP/lib/roa.cpp index 0ddbfd5e8b8..8fda440bfc8 100644 --- a/TAO/IIOP/lib/roa.cpp +++ b/TAO/IIOP/lib/roa.cpp @@ -67,13 +67,13 @@ ROA_Handler::handle_input(ACE_HANDLE fd) // to dynamically allocate memory. // CORBA_Environment env; - TCP_OA::dispatch_context ctx; + Dispatch_Context ctx; ctx.skeleton = ACE_ROA::upcall(); ctx.context = ACE_ROA::context(); ctx.check_forward = ACE_ROA::forwarder(); ctx.oa = ACE_ROA::oa(); - ctx.endpoint = fd; + ctx.endpoint = peer(); #ifdef _POSIX_THREADS ctx.aggressive = CORBA_B_FALSE; diff --git a/TAO/IIOP/lib/roa.hh b/TAO/IIOP/lib/roa.hh index 1ca7ad1c096..6b221d1a786 100644 --- a/TAO/IIOP/lib/roa.hh +++ b/TAO/IIOP/lib/roa.hh @@ -72,7 +72,7 @@ public: virtual int open(void*); virtual int svc(void); protected: - virtual int handle_input(ACE_HANDLE); + virtual int handle_input(ACE_HANDLE = ACE_INVALID_HANDLE); virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask); }; diff --git a/TAO/IIOP/lib/tcpoa.cpp b/TAO/IIOP/lib/tcpoa.cpp index d18f79569a8..3dd8046ca8e 100644 --- a/TAO/IIOP/lib/tcpoa.cpp +++ b/TAO/IIOP/lib/tcpoa.cpp @@ -82,9 +82,9 @@ tcp_oa_dispatcher ( request_tsd = &req; #endif // _POSIX_THREADS - TCP_OA::dispatch_context* helper; + Dispatch_Context* helper; - helper = (TCP_OA::dispatch_context*) context; + helper = (Dispatch_Context*) context; helper->skeleton (req.object_key, svr_req, helper->context, env); svr_req.release (); @@ -190,10 +190,10 @@ tcp_oa_forwarder ( void *ctx ) { - TCP_OA::dispatch_context *helper; + Dispatch_Context *helper; CORBA_Environment env; - helper = (TCP_OA::dispatch_context *) ctx; + helper = (Dispatch_Context *) ctx; assert (helper->check_forward != 0); helper->check_forward (target_key, forward_reference, helper->context, env); @@ -475,7 +475,7 @@ TCP_OA::get_client_principal ( // Generic routine to handle a message. // void -TCP_OA::handle_message (dispatch_context& ctx, CORBA_Environment& env) +TCP_OA::handle_message (Dispatch_Context& ctx, CORBA_Environment& env) { GIOP::incoming_message (ctx.endpoint, ctx.check_forward ? tcp_oa_forwarder : 0, @@ -510,8 +510,9 @@ void * TCP_OA::worker (void *arg) { CORBA_Environment env; - dispatch_context *context = (dispatch_context *)arg; + Dispatch_Context *context = (Dispatch_Context *)arg; +#if 0 do { dmsg1 ("worker starting on FD %d", context->endpoint->fd); context->oa->handle_message (*context, env); @@ -522,6 +523,7 @@ TCP_OA::worker (void *arg) } } while (context->aggressive && context->endpoint != -1); +#endif delete context; return 0; } @@ -707,9 +709,9 @@ TCP_OA::get_request ( // connection resource too -- when this dispatch context gets // destroyed, only then is the connection released. // - dispatch_context *ctx; + Dispatch_Context* ctx; - ctx = new dispatch_context; + ctx = new Dispatch_Context; ctx->skeleton = handler; ctx->check_forward = check_forward; ctx->context = app_state; @@ -754,7 +756,7 @@ TCP_OA::get_request ( // Handle it in this thread. We can do it without any need // to dynamically allocate memory. // - dispatch_context ctx; + Dispatch_Context ctx; ctx.skeleton = handler; ctx.check_forward = check_forward; diff --git a/TAO/IIOP/lib/tcpoa.hh b/TAO/IIOP/lib/tcpoa.hh index 69da4d9c2d7..c7568890462 100644 --- a/TAO/IIOP/lib/tcpoa.hh +++ b/TAO/IIOP/lib/tcpoa.hh @@ -37,6 +37,26 @@ typedef TCP_OA *TCP_OA_ptr; extern const IID IID_TCP_OA; +// +// Data structure passed as "context" to the GIOP code, which then +// calls back one of the two helper routines as part of handling any +// particular incoming request. +// +struct Dispatch_Context +{ + TOA::dsi_handler skeleton; + void (*check_forward) (CORBA_OctetSeq& key, + CORBA_Object_ptr& fwd_ref, + void* context, + CORBA_Environment& env); + void* context; + TCP_OA* oa; + ACE_SOCK_Stream endpoint; // NOTE!!! This type MUST match that used for ROA_Handler! +#ifdef _POSIX_THREADS + CORBA_Boolean aggressive; +#endif // _POSIX_THREADS +}; + class _EXPCLASS TCP_OA : public TOA { public: @@ -156,25 +176,6 @@ public: // PRIVATE: - // - // Data structure passed as "context" to the GIOP code, which then - // calls back one of the two helper routines as part of handling any - // particular incoming request. - // - struct dispatch_context - { - TOA::dsi_handler skeleton; - void (*check_forward) (CORBA_OctetSeq& key, - CORBA_Object_ptr& fwd_ref, - void* context, - CORBA_Environment& env); - void* context; - TCP_OA* oa; - ACE_HANDLE endpoint; -#ifdef _POSIX_THREADS - CORBA_Boolean aggressive; -#endif // _POSIX_THREADS - }; private: friend class ROA_Handler; // needed so it can call handle_message() @@ -193,7 +194,7 @@ private: // Used internally by threaded (and unthreaded) code to // dispatch incoming GIOP messages // - void handle_message (dispatch_context& context, CORBA_Environment& env); + void handle_message (Dispatch_Context& context, CORBA_Environment& env); #ifdef _POSIX_THREADS // |