// $Id$ // // @(#)giop.cpp 1.10 95/09/21 // Copyright 1994-1995 by Sun Microsystems Inc. // All Rights Reserved // // GIOP: Utility routines for sending, receiving GIOP messages // // Note that the Internet IOP is just the TCP-specific mapping of the // General IOP. Areas where other protocols may map differently // include use of record streams (TCP has none), orderly disconnect // (TCP has it), endpoint addressing (TCP uses host + port), security // (Internet security should be leveraged by IIOP) and more. // // NOTE: There are a few places where this code knows that it's really // talking IIOP instead of GIOP. No rush to fix this so long as we // are really not running atop multiple connection protocols. // // THREADING NOTE: currently, the connection manager eliminates tricky // threading issues by providing this code with the same programming // model both in threaded and unthreaded environments. Since the GIOP // APIs were all designed to be reentrant, this makes threading rather // simple! // // That threading model is that the thread making (or handling) a call // is given exclusive access to a connection for the duration of a // call, so that no multiplexing or demultiplexing is needed. That // is, locking is at the "connection level" rather than "message // level". // // The down side of this simple threading model is that utilization of // system resources (mostly connections, but to some extent network // I/O) in some kinds of environments can be inefficient. However, // simpler threading models are much easier to get properly debugged, // and often perform better. Also, such environments haven't been // seen to be any kind of problem; the model can be changed later if // needed, it's just an internal implementation detail. Any portable // ORB client is not allowed to rely on semantic implications of such // a model. // // XXX there is lots of unverified I/O here. In all cases, if an // error is detected when marshaling or unmarshaling, it should be // reported. #include "tao/corba.h" #include "tao/Timeprobe.h" static const char digits [] = "0123456789ABCD"; static const char *names [] = { "Request", "Reply", "CancelRequest", "LocateRequest", "LocateReply", "CloseConnection", "MessageError", "EndOfFile" }; static void dump_msg (const char *label, const u_char *ptr, size_t len) { if (TAO_debug_level >= 2) { ACE_DEBUG ((LM_DEBUG, "%s GIOP v%c.%c msg, %d data bytes, %s endian, %s\n", label, digits[ptr[4]], digits[ptr[5]], len - TAO_GIOP_HEADER_LEN, (ptr[6] == TAO_ENCAP_BYTE_ORDER) ? "my" : "other", (ptr[7] <= TAO_GIOP_MessageError) ? names [ptr[7]] : "UNKNOWN TYPE")); if (TAO_debug_level >= 4) ACE_HEX_DUMP ((LM_DEBUG, (const char*)ptr, len, "(%P|%t) data bytes\n")); } } CORBA::Boolean TAO_GIOP::send_request (TAO_SVC_HANDLER *handler, CDR &stream) { ACE_TIMEPROBE (" -> GIOP::send_request - start"); char *buf = (char *) stream.buffer (); size_t buflen = stream.length (); // assert (buflen == (stream.length - stream.remaining)); // Patch the message length in the GIOP header; it's always at the // same eight byte offset into the message. // // NOTE: Here would also be a fine place to calculate a digital // signature for the message and place it into a preallocated slot // in the "ServiceContext". Similarly, this is a good spot to // encrypt messages (or just the message bodies) if that's needed in // this particular environment and that isn't handled by the // networking infrastructure (e.g. IPSEC). *(CORBA::Long *) (buf + 8) = (CORBA::Long) (buflen - TAO_GIOP_HEADER_LEN); // Strictly speaking, should not need to loop here because the // socket never gets set to a nonblocking mode ... some Linux // versions seem to need it though. Leaving it costs little. dump_msg ("send", ACE_reinterpret_cast(u_char*,buf), buflen); ACE_SOCK_Stream &peer = handler->peer (); while (buflen > 0) { if (buflen > stream.length ()) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) ?? writebuf, buflen %u > length %u\n", buflen, stream.length ())); ACE_TIMEPROBE (" -> GIOP::send_request - fail"); return CORBA::B_FALSE; } ssize_t writelen = peer.send_n (buf, buflen); #if defined (DEBUG) // dmsg_filter (6, "wrote %d bytes to connection %d", // writelen, connection); dmsg_filter (6, "wrote %d bytes", writelen); #endif /* DEBUG */ assert ((writelen >= 0 && ((size_t)writelen) <= buflen) || writelen == -1); // On error or EOF, report the fault, close the connection, and // mark it as unusable/defunct. // // XXX on client side write errors, we may hit the case that the // server did a clean shutdown but we've not yet read the // GIOP::CloseConnection message. If we get an error, we need // to see if there is such a message waiting for us, and if so // we should cause (full) rebinding to take place. if (writelen == -1) { ACE_DEBUG ((LM_ERROR, "(%P|%t) %p\n", "OutgoingMessage::writebuf ()")); ACE_DEBUG ((LM_DEBUG, "(%P|%t) closing conn %d after fault\n", peer.get_handle ())); handler->close (); ACE_TIMEPROBE (" -> GIOP::send_request - fail"); return CORBA::B_FALSE; } else if (writelen == 0) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) OutgoingMessage::writebuf () ... EOF, closing conn %d\n", peer.get_handle ())); handler->close (); ACE_TIMEPROBE (" -> GIOP::send_request - fail"); return CORBA::B_FALSE; } if ((buflen -= writelen) != 0) buf += writelen; #if defined (DEBUG) // // NOTE: this should never be seen. However, on Linux // it's been seen with UNIX domain sockets. // if (buflen) dmsg_filter (8, "%u more bytes to write...\n", buflen); #endif /* DEBUG */ } ACE_TIMEPROBE (" -> GIOP::send_request - done"); return CORBA::B_TRUE; } // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. // // NOTE: this is IIOP-specific though it doesn't look like it is. It // relies on a TCP-ism: orderly disconnect, which doesn't exist in all // transport protocols. Versions of GIOP atop some transport that's // lacking orderly disconnect must define some transport-specific // handshaking (e.g. the XNS/SPP handshake convention) in order to // know that the same transport semantics are provided when shutdown // is begun with messages "in flight". (IIOP doesn't report false // errors in the case of "clean shutdown", because it relies on // orderly disconnect as provided by TCP. This quality of service is // required to write robust distributed systems.) static const char close_message [TAO_GIOP_HEADER_LEN] = { 'G', 'I', 'O', 'P', MY_MAJOR, MY_MINOR, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_CloseConnection, 0, 0, 0, 0 }; void TAO_GIOP::close_connection (TAO_Client_Connection_Handler *&handler, void *) { // It's important that we use a reliable shutdown after we send this // message, so we know it's received. // // XXX should recv and discard queued data for portability; note // that this won't block (long) since we never set SO_LINGER dump_msg ("send", (const u_char *) close_message, TAO_GIOP_HEADER_LEN); handler->peer ().send (close_message, TAO_GIOP_HEADER_LEN); ACE_HANDLE which = handler->peer ().get_handle (); handler->close (); handler = 0; ACE_DEBUG ((LM_DEBUG, "(%P|%t) shut down socket %d\n", which)); } // Send an "I can't understand you" message -- again, the message is // prefabricated for simplicity. This implies abortive disconnect (at // the application level, if not at the level of TCP). // // NOTE that IIOP will still benefit from TCP's orderly disconnect. static const char error_message [TAO_GIOP_HEADER_LEN] = { 'G', 'I', 'O', 'P', MY_MAJOR, MY_MINOR, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MessageError, 0, 0, 0, 0 }; static inline void send_error (TAO_Client_Connection_Handler *&handler) { dump_msg ("send", (const u_char *) error_message, TAO_GIOP_HEADER_LEN); handler->peer ().send_n (error_message, TAO_GIOP_HEADER_LEN); ACE_HANDLE which = handler->peer ().get_handle (); handler->close (); handler = 0; ACE_DEBUG ((LM_DEBUG, "(%P|%t) aborted socket %d\n", which)); } // Loop on data read ... this is required with some implementations of // sockets (e.g. winsock, HP/UX) since even when async mode is not // set, recv () won't block until the requested amount of data is // available. static int read_buffer (ACE_SOCK_Stream &peer, char *buf, size_t len) { int bytes_read = 0; bytes_read = peer.recv_n (buf, len); if (bytes_read == -1 && errno == ECONNRESET) { // We got a connection reset (TCP RSET) from the other side, // i.e., they didn't initiate a proper shutdown. // // Make it look like things are OK to the upper layer. bytes_read = 0; errno = 0; } return bytes_read; } // Read the message header, plus any data part of the message, setting // stuff up so that CDR byteswaps data as appropriate. Errors are // reported to be MessageError messages. // // NOTE: this code is structured to issue two read () calls for each // incoming message. Alternative structures (e.g. with a user-space // buffer per connection, or networking code handing off entire GIOP // messages) can reduce the overhead of these calls to the networking // code; correctness and simplicity drove this implementation more // than efficiency. // // NOTE: as always, counting system calls associated with I/O gives // you a good basic understanding of the tuning issues. On the server // side, there is normally select/read/read/write per invocation. The // call to select () can be omitted by allocating a thread to each // connection; in some cases, that alone has almost doubled // performance. The two read () calls can be made into one by fancy // buffering. How fast could it be with both optimizations applied? TAO_GIOP_MsgType TAO_GIOP::recv_request (TAO_SVC_HANDLER *&handler, CDR &msg, CORBA::Environment &env) { ACE_TIMEPROBE (" -> GIOP::recv_request - start"); TAO_GIOP_MsgType retval; CORBA::ULong message_size; ACE_SOCK_Stream &connection = handler->peer (); // Read the message header off the wire. // // THREADING NOTE: the connection manager handed us this connection // for exclusive use, so we need not worry about having two threads // interleave reads of partial messages. This model is excellent // for "lightly threaded" systems (as will be the majority in the // near future) but makes less effective use of connection resources // as the "duty factor" goes down because of either long calls or // bursty contention during numerous short calls to the same server. msg.reset (); if (msg.grow (TAO_GIOP_HEADER_LEN) == 0) { env.exception (new CORBA::NO_MEMORY (CORBA::COMPLETED_MAYBE)); return TAO_GIOP_MessageError; } assert (msg.size () > TAO_GIOP_HEADER_LEN); char *bufptr = (char*) msg.buffer (); ssize_t len = read_buffer (connection, bufptr, TAO_GIOP_HEADER_LEN); // Read the header into the buffer. if (len != TAO_GIOP_HEADER_LEN) { switch (len) { case 0: ACE_DEBUG ((LM_DEBUG, "(%P|%t) Header EOF ... peer probably aborted connection %d\n", connection.get_handle ())); ACE_TIMEPROBE (" -> GIOP::recv_request - EOF"); return TAO_GIOP_EndOfFile; // XXX should probably find some way to report this without // an exception, since for most servers it's not an error. // Is it _never_ an error? Not sure ... /* NOTREACHED */ case -1: // error ACE_DEBUG ((LM_ERROR, "(%P|%t) GIOP::recv_request header socket error %p\n", "read_buffer")); break; /* NOTREACHED */ default: ACE_DEBUG ((LM_ERROR, "(%P|%t) GIOP::recv_request header read failed, only %d of %d bytes\n", len, TAO_GIOP_HEADER_LEN)); break; /* NOTREACHED */ } env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_MAYBE)); ACE_TIMEPROBE (" -> GIOP::recv_request - fail"); return TAO_GIOP_MessageError; } // Set the end of the message.... msg.wr_ptr (TAO_GIOP_HEADER_LEN); // NOTE: if message headers, or whome messages, get encrypted in // application software (rather than by the network infrastructure) // they should be decrypted here ... // First make sure it's a GIOP message of any version. if (!(bufptr [0] == 'G' && bufptr [1] == 'I' && bufptr [2] == 'O' && bufptr [3] == 'P')) { env.exception (new CORBA::MARSHAL (CORBA::COMPLETED_MAYBE)); // header ACE_DEBUG ((LM_DEBUG, "bad header, magic word\n")); ACE_TIMEPROBE (" -> GIOP::recv_request - fail"); return TAO_GIOP_MessageError; } // Then make sure the major version is ours, and the minor version // is one that we understand. if (!(bufptr [4] == MY_MAJOR && bufptr [5] <= MY_MINOR)) { env.exception (new CORBA::MARSHAL (CORBA::COMPLETED_MAYBE)); // header ACE_DEBUG ((LM_DEBUG, "bad header, version\n")); ACE_TIMEPROBE (" -> GIOP::recv_request - fail"); return TAO_GIOP_MessageError; } // Get the message type out and adjust the buffer's records to record // that we've read everything except the length. retval = (TAO_GIOP_MsgType) bufptr[7]; msg.do_byteswap = (bufptr [6] != TAO_ENCAP_BYTE_ORDER); // Make sure byteswapping is done if needed, and then read the // message size (appropriately byteswapped). msg.rd_ptr (8); msg.get_ulong (message_size); // Make sure we have the full length in memory, growing the buffer // if needed. // // NOTE: We could overwrite these few bytes of header... they're // left around for now as a debugging aid. assert (message_size <= UINT_MAX); if (msg.grow (TAO_GIOP_HEADER_LEN + message_size) == 0) { env.exception (new CORBA::NO_MEMORY (CORBA::COMPLETED_MAYBE)); return TAO_GIOP_MessageError; } // The offset from the current position were data writing should // start, since we already read the msg length (4 bytes) starting at // position 8 we need to substract 12 bytes. const int offset = TAO_GIOP_HEADER_LEN - 12; bufptr = (char *)msg.buffer() + offset; // Read the rest of this message into the buffer. len = read_buffer (connection, bufptr, (size_t) message_size); if (len != (ssize_t) message_size) { switch (len) { case 0: ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_GIOP::recv_request body, EOF on handle %d\n", connection.get_handle ())); break; /* NOTREACHED */ case -1: ACE_DEBUG ((LM_ERROR, "(%P|%t) TAO_GIOP::recv_request () body %p\n", "read_buffer")); break; /* NOTREACHED */ default: ACE_DEBUG ((LM_ERROR, "(%P|%t) short read, only %d of %d bytes\n", len, message_size)); break; /* NOTREACHED */ } // clean up, and ... env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_MAYBE)); // body ACE_DEBUG ((LM_DEBUG, "couldn't read rest of message\n")); ACE_TIMEPROBE (" -> GIOP::recv_request - fail"); return TAO_GIOP_MessageError; } // Set the end of the message.... msg.wr_ptr (message_size); dump_msg ("recv", ACE_reinterpret_cast(u_char*,msg.buffer()), (size_t) (message_size + TAO_GIOP_HEADER_LEN)); ACE_TIMEPROBE (" -> GIOP::recv_request - done"); return retval; } // Normal invocations don't involve any heap allocation; messages are // constructed into stack-based buffers and are read into those // buffers too. Larger buffers are heap-allocated as needed. // // The constraint on request IDs is that no two requests from the same // client with the same ID are outstanding at the same time. In // single threaded environments, this is met by any number whatever. // When multiple threads are used, we eliminate the need for any // locked state by using the thread ID as the request ID, since any // given thread has at most one request outstanding at a time. // // NOTE: this means that if "deferred synchronous" calls get // supported, it's done by creating a thread internally to make the // call. That is less disruptive (and error prone) in general than // restructuring an ORB core in terms of asynchrony. TAO_GIOP_Invocation::TAO_GIOP_Invocation (IIOP_Object *data, const char *operation, CORBA::Boolean is_roundtrip) : data_ (data), opname_ (operation), do_rsvp_ (is_roundtrip), my_request_id_ (0), stream_ (buffer, sizeof buffer), handler_ (0) { // The assumption that thread ids are ints is false and horribly // implementation-dependent, so this code just sucks. But, at least // it will compile on multiple platforms through the magic of ACE // :-/ //assert (sizeof (CORBA::ULong) == sizeof (ACE_thread_t)); ACE_thread_t me = ACE_OS::thr_self (); // Copy in only as many bytes are valid, or only as many as we have // room for, whichever is less. -------> What a friggin' HACK!?!?! ACE_OS::memcpy (&this->my_request_id_, &me, ACE_MIN (sizeof (me), sizeof (this->my_request_id_))); } TAO_GIOP_Invocation::~TAO_GIOP_Invocation (void) { if (this->handler_ != 0) this->handler_->idle (); } // Octet codes for the parameters of the "Opaque" (sequence of octet) // data type used various places internally ... a CDR encapsulation // holding two parameters (like all sequence TypeCodes). // // NOTE: this **MUST** be longword aligned, which is why it's coded as // a longword array not an octet array. Just sticking a long in for // padding won't work with compilers that optimize unused data out of // existence. static const CORBA::Long _oc_opaque [] = { // CDR typecode octets TAO_ENCAP_BYTE_ORDER, // native endian + padding; "tricky" 10, // ... (sequence of) octets 0 // ... unbounded }; CORBA::TypeCode TC_opaque (CORBA::tk_sequence, sizeof _oc_opaque, (char *) &_oc_opaque, CORBA::B_FALSE); // Octet codes for the parameters of the ServiceContextList TypeCode // ... this is a CDR encapsulation holding two parameters (like all // sequences): a TypeCode, and the bounds of the sequence (zero in // this case). // // This is complicated since the Typecode for the data type for the // sequence members is complex, a structure that nests two further // typecodes (one is a sequence). // // NOTE: this must be longword aligned! static const CORBA::Long _oc_svc_ctx_list [] = { // START bytes of encapsulation 0 TAO_ENCAP_BYTE_ORDER, // native endian + padding; "tricky" // // FIRST sequence param: typecode for struct is complex, // and so uses a nested encapsulation. // CORBA::tk_struct, 72, // length of encapsulation 1 // START bytes of encapsulation 1 (struct params) 1, // native endian + padding; "tricky" 1, 0, // type ID omitted: null string 1, 0, // name omitted "ServiceContext" 2, // two struct elements // First structure element: name, typecode for ULong // // NOTE: to be more strictly correct this could be a CORBA::tk_alias // typecode ... 1, 0, // name omitted: "context_id" CORBA::tk_long, // Second structure element: name, typecode for sequence of octet; // the typecode for sequence of octet is complex, there's a second // level of nested encapuslation here. 1, 0, // name omitted: "context_data" CORBA::tk_sequence, // sequence typecode 16, // length of encapsulation 2 // START bytes of encapsulation 2 (sequence params) 1, // native endian + padding; "tricky" 1, 0, // type ID omitted: null string CORBA::tk_octet, // (sequence of) octet 0, // ... unbounded length // END bytes of encapsulation 2 (sequence params) // END bytes of encapsulation 1 (struct params) // SECOND sequence param: bound of sequence (none) 0 // unbounded seq of ServiceContext // END bytes of encapsulation 0 (sequence params) }; CORBA::TypeCode TC_ServiceContextList (CORBA::tk_sequence, sizeof _oc_svc_ctx_list, (char *) &_oc_svc_ctx_list, CORBA::B_FALSE); // The public API involves creating an invocation, starting it, filling // in request parameters, actually performing the invocation, getting // response parameters, and then cleaning up. Sometimes they must be // restarted (e.g. request forwarding). This is the start/restart entry. void TAO_GIOP_Invocation::start (CORBA::Environment &env) { const TAO_opaque *key; // First try to bind to the appropriate address. We do that here // since we may get forwarded to a different objref in the course of // any given call, with new start () call each time. It's not // cached in the objref data since the connections change // asynchronously from objref invocations and this simplifies // connection management. // // THREADING NOTE: this connection is reserved to this call. Also, // starting at this point in the call, new forwarding information // will not be used until/unless the call is reissued. Correctness // is not affected, the call will just be forwarded later than it // might be in a more complex implementation. assert (data_ != 0); // Get a pointer to the connector, which might be in thread-specific // storage, depending on the concurrency model. TAO_CONNECTOR *con = TAO_ORB_Core_instance ()->connector (); // Determine the object key and the address to which we'll need a // connection. ACE_INET_Addr *server_addr_p = 0; { // Begin a new scope so we keep this lock only as long as // necessary #if 0 /* Keep this around for when forwarding might be implemented (if ever) */ ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, guard, data_->fwd_profile_lock ())); #endif if (data_->fwd_profile_i () != 0) { key = &data_->fwd_profile_i ()->object_key; server_addr_p = &data_->fwd_profile_i ()->object_addr (); } else { key = &data_->profile.object_key; server_addr_p = &data_->profile.object_addr (); } } if (server_addr_p == 0) { env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); return; } this->handler_ = 0; // Must reset handler, otherwise, will // complain. // Establish the connection and get back a // . if (con->connect (this->handler_, *server_addr_p) == -1) { // Give users a clue to the problem. ACE_DEBUG ((LM_ERROR, "(%P|%t) %s:%u, connection to " "%s (%s):%hu failed (%p)\n", __FILE__, __LINE__, server_addr_p->get_host_name (), server_addr_p->get_host_addr (), server_addr_p->get_port_number (), "errno")); // There might be a better exception to set, but it's unclear // which one should be used. This one applies, even if it's // vague. env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); return; } // Use the ACE_SOCK_Stream from the Client_Connection_Handler for // communication inplace of the endpoint used below. // POLICY DECISION: If the client expects most agents to forward, // then it could try to make sure that it's been forwarded at least // once by eliciting it with a LocateRequest message. (Further // hinting in the IIOP::ProfileData could help!) // // That scenario does not match an "Inter" ORB Protocol well, since // bridges chain calls rather than forwarding them. It does match // some kinds of "Intra" ORB scenarios well, with many agents that // spawn new processes talking to their clients across the net. // // At this time, the policy noted above is followed in the sense // that this software does NOT expect most agents to forward, so it // doesn't bother to probe. Correctness is not affected; this is // only a quality-of-service policy. It affects mostly performance, // but the "best efforts" semantics for "oneway" messages would also // be impacted in that some (by definition, buggy!) code which used // only "oneway" messages might not work at all. // Build the outgoing message, starting with generic GIOP header. CORBA::Boolean bt = TAO_GIOP::start_message (TAO_GIOP_Request, this->stream_); if (bt != CORBA::B_TRUE) { env.exception (new CORBA::MARSHAL (CORBA::COMPLETED_NO)); return; } // Then fill in the rest of the RequestHeader // // The first element of header is service context list; // transactional context would be acquired here using the // transaction service APIs. Other kinds of context are as yet // undefined. // // Last element of request header is the principal; no portable way // to get it, we just pass empty principal (convention: indicates // "anybody"). Steps upward in security include passing an // unverified user ID, and then verifying the message (i.e. a dummy // service context entry is set up to hold a digital signature for // this message, then patched shortly before it's sent). static CORBA::Principal_ptr anybody = 0; static TAO_GIOP_ServiceContextList svc_ctx; // all zeroes if (this->stream_.encode (&TC_ServiceContextList, 0, &svc_ctx, env) != CORBA::TypeCode::TRAVERSE_CONTINUE) return; if (!this->stream_.put_ulong (this->my_request_id_) || !this->stream_.put_boolean (this->do_rsvp_)) { env.exception (new CORBA::MARSHAL (CORBA::COMPLETED_NO)); return; } if (this->stream_.encode (&TC_opaque, key, 0, env) != CORBA::TypeCode::TRAVERSE_CONTINUE || this->stream_.encode (CORBA::_tc_string, &opname_, 0, env) != CORBA::TypeCode::TRAVERSE_CONTINUE || this->stream_.encode (CORBA::_tc_Principal, &anybody, 0, env) != CORBA::TypeCode::TRAVERSE_CONTINUE) return; // right after fault else return; // no fault reported } extern CORBA::ExceptionList __system_exceptions; // Send request, block until any reply comes back, and unmarshal reply // parameters as appropriate. static inline const char* TAO_GIOP_message_name (TAO_GIOP_MsgType which) { static const char* msgnames[] = { "EndOfFile (nonstd)", "Request (client)", "Reply (server)", "CancelRequest (client)", "LocateRequest (client)", "LocateReply (server)", "CloseConnection (server)", "MessageError (either)" }; int i = (int)which; i++; // Add one since EndOfFile is -1 if (i > (int) (sizeof(msgnames)/sizeof(msgnames[0]))) return ""; else return msgnames[i]; } TAO_GIOP_ReplyStatusType TAO_GIOP_Invocation::invoke (CORBA::ExceptionList &exceptions, CORBA::Environment &env) { // Send Request, return on error or if we're done if (this->handler_->send_request (this->stream_, this->do_rsvp_) == -1) { // send_request () closed the connection; we just set the // handler to 0 here. this->handler_ = 0; // // XXX highly desirable to know whether we wrote _any_ data; if // we wrote none, then there's no chance the call completed and // applications don't have to deal with those nasty // indeterminate states where they can't immediatly tell if // what's safe to do. // // XXX also, there might have been a GIOP::CloseConnection // message in the input queue. If so, this request should be // treated as a (full) "rebind" case. Can't do that from this // point in the code however! Some minor restructuring needs to // happen. // env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_MAYBE)); return TAO_GIOP_SYSTEM_EXCEPTION; } if (!this->do_rsvp_) return TAO_GIOP_NO_EXCEPTION; // This blocks until the response is read. In the current version, // there is only one client thread that ever uses this connection, // so most response messages are illegal. // // THREADING NOTE: to make more efficient use of connection // resources, we'd multiplex I/O on connections. For example, one // thread would write its GIOP::Request (or GIOP::LocateRequest etc) // message and block for the response, then another would do the // same thing. When a response came back, it would be handed to the // thread which requested it. // // Currently the connection manager doesn't support such fine // grained connection locking, and also this server implementation // wouldn't take advantage of that potential concurrency in requests // either. There are often performance losses coming from // fine-grained locks being used inappropriately; there's some // evidence that locking at the level of requests loses on at least // some platforms. // // XXX In all MT environments, there's a cancellation point lurking // here; need to investigate. Client threads would frequently be // canceled sometime during recv_request ... the correct action to // take on being canceled is to issue a CancelRequest message to the // server and then imediately let other client-side cancellation // handlers do their jobs. // // In C++, that basically means to unwind the stack using almost // normal procedures: all destructors should fire, and some "catch" // blocks should probably be able to handle things like releasing // pointers. (Without unwinding the C++ stack, resources that must // be freed by thread cancellation won't be freed, and the process // won't continue to function correctly.) The tricky part is that // according to POSIX, all C stack frames must also have their // (explicitly coded) handlers called. We assume a POSIX.1c/C/C++ // environment. TAO_SVC_HANDLER *handler = this->handler_; TAO_GIOP_MsgType m = TAO_GIOP::recv_request (handler, this->stream_, env); switch (m) { case TAO_GIOP_Reply: // handle reply ... must be right one etc break; case TAO_GIOP_CloseConnection: // Special case of forwarding -- server was closing the // connection, which just indicates resource constraints, not an // error. The client is effectively "forwarded" to the same // server! // // However, we must reinitialize the forwarding chain, since the // resource being reclaimed might also have been the process, // not just the connection. Without reinitializing, we'd give // false error reports to applications. { #if 0 /* Keep this around in case forwarding is ever implemented */ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, data_->fwd_profile_lock (), TAO_GIOP_SYSTEM_EXCEPTION)); #endif IIOP::Profile *old = data_->fwd_profile_i (0); delete old; this->handler_->close (); this->handler_ = 0; return TAO_GIOP_LOCATION_FORWARD; } case TAO_GIOP_Request: case TAO_GIOP_CancelRequest: case TAO_GIOP_LocateRequest: case TAO_GIOP_LocateReply: default: // These are all illegal messages to find. If found, they could // be indicative of client bugs (lost track of input stream) or // server bugs; maybe the request was acted on, maybe not, we // can't tell. ACE_DEBUG ((LM_DEBUG, "(%P|%t) illegal GIOP message (%s) in response to my Request!\n", TAO_GIOP_message_name(m))); env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_MAYBE)); // FALLTHROUGH ... case TAO_GIOP_MessageError: // Couldn't read it for some reason ... exception's set already, // so just tell the other end about the trouble (closing the // connection) and return. send_error (this->handler_); return TAO_GIOP_SYSTEM_EXCEPTION; } // Process reply message. Again, due to the single threading in // this code, only the reply to this request is allowed to be coming // back. // // NOTE: if the response really _isn't_ for this thread, it's now // treated as an error in which synchronization can't be recovered. // There might be cases where it _could_ be recovered ... e.g. maybe // for some reason the previous call couldn't pick up its response. // It'd be worth investigating (and handling) any such cases. // // NOTE: since this implementation supports no ORB services // (notably, the transaction service, which is the only one that's // currently defined), the reply context is discarded. Normally // it'd be fed, component at a time, to the relevant services. // // NOTE: As security support kicks in, this is the right place to // verify a digital signature, if that is required in this // particular runtime security environment. How to know if that's // the case? It's likely that standard Internet IPSEC // infrastructure (RFC 1825 through 1827, and successors) will be // used to enforce many security policies; integrity and privacy // guarantees may be provided by the network, and need no support // here. TAO_GIOP_ServiceContextList reply_ctx; CORBA::ULong request_id; CORBA::ULong reply_status; // TAO_GIOP_ReplyStatusType if (this->stream_.decode (&TC_ServiceContextList, &reply_ctx, 0, env) != CORBA::TypeCode::TRAVERSE_CONTINUE) { send_error (this->handler_); return TAO_GIOP_SYSTEM_EXCEPTION; } if (!this->stream_.get_ulong (request_id) || request_id != this->my_request_id_ || !this->stream_.get_ulong (reply_status) || reply_status > TAO_GIOP_LOCATION_FORWARD) { send_error (this->handler_); env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_MAYBE)); ACE_DEBUG ((LM_DEBUG, "(%P|%t) bad Response header\n")); return TAO_GIOP_SYSTEM_EXCEPTION; } // If there was no exception, let the caller parse the normal // response. Otherwise parse and handle the response; we always // know how to deal with the standard exceptions, and the caller // provides a list of allowed user-defined exceptions so that we // know how to unmarshal those too (without IFR consultation). // // When requests are forwarded, we just store the revised profile // data in this objref structure. The expectation is that the call // will be reissued until someone gives up on a forwarding chain, // and that other calls will reap the benefit of the forwarding work // by this thread. // // NOTE: should ensure that from here on, all system exceptions // return COMPLETED_YES status ... even ones reported by code which // we call. switch (reply_status) { case TAO_GIOP_NO_EXCEPTION: break; case TAO_GIOP_USER_EXCEPTION: case TAO_GIOP_SYSTEM_EXCEPTION: { char* buf; // Pull the exception ID out of the marshaling buffer. { if (this->stream_.get_string (buf) == CORBA::B_FALSE) { send_error (this->handler_); env.exception (new CORBA::MARSHAL (CORBA::COMPLETED_YES)); return TAO_GIOP_SYSTEM_EXCEPTION; } } // User and system exceptions differ only in what table of // exception typecodes is searched. CORBA::ExceptionList *xlist; if (reply_status == TAO_GIOP_USER_EXCEPTION) xlist = &exceptions; else xlist = &__system_exceptions; // Find it in the operation description and then use that to get // the typecode. Use it to unmarshal the exception's value; if // that exception is not allowed by this operation, fail (next). u_int i; CORBA::TypeCode_ptr *tcp; for (i = 0, tcp = xlist->buffer; i < xlist->length; i++, tcp++) { const char *xid; xid = (*tcp)->id (env); if (env.exception () != 0) { dexc (env, "invoke (), get exception ID"); send_error (this->handler_); return TAO_GIOP_SYSTEM_EXCEPTION; } if (ACE_OS::strcmp (buf, (char *)xid) == 0) { size_t size; CORBA::Exception *exception; size = (*tcp)->size (env); if (env.exception () != 0) { dexc (env, "invoke (), get exception size"); send_error (this->handler_); return TAO_GIOP_SYSTEM_EXCEPTION; } // Create the exception, fill in the generic parts // such as vtable, typecode ptr, refcount ... we need // to clean them all up together, in case of errors // unmarshaling. exception = new (new char [size]) CORBA::Exception (*tcp); if (this->stream_.decode (*tcp, exception, 0, env) != CORBA::TypeCode::TRAVERSE_CONTINUE) { delete exception; ACE_DEBUG ((LM_ERROR, "(%P|%t) invoke, unmarshal %s exception %s\n", (reply_status == TAO_GIOP_USER_EXCEPTION) ? "user" : "system", buf)); send_error (this->handler_); return TAO_GIOP_SYSTEM_EXCEPTION; } env.exception (exception); return (TAO_GIOP_ReplyStatusType) reply_status; } } // If we couldn't find this exception's typecode, report it as // an OA error since the skeleton passed an exception that was // not allowed by the operation's IDL definition. In the case // of a dynamic skeleton it's actually an implementation bug. // // It's known to be _very_ misleading to try reporting this as // any kind of marshaling error (unless minor codes are made // to be _very_ useful) ... folk try to find/fix ORB bugs that // don't exist, not bugs in/near the implementation code. if (reply_status == TAO_GIOP_USER_EXCEPTION) env.exception (new CORBA::OBJ_ADAPTER (CORBA::COMPLETED_YES)); else env.exception (new CORBA::INTERNAL (CORBA::COMPLETED_MAYBE)); return TAO_GIOP_SYSTEM_EXCEPTION; } // NOTREACHED case TAO_GIOP_LOCATION_FORWARD: { CORBA::Object_ptr obj; IIOP_Object *obj2; // Unmarshal the object we _should_ be calling. We know that // one of the facets of this object will be an IIOP invocation // profile. if (this->stream_.decode (CORBA::_tc_Object, &obj, 0, env) != CORBA::TypeCode::TRAVERSE_CONTINUE || obj->QueryInterface (IID_IIOP_Object, (void **) &obj2) != TAO_NOERROR) { dexc (env, "invoke, location forward"); send_error (this->handler_); return TAO_GIOP_SYSTEM_EXCEPTION; } CORBA::release (obj); // Make a copy of the IIOP profile in the forwarded objref, // reusing memory where practical. Then delete the forwarded // objref, retaining only its profile. // // XXX add and use a "forward count", to prevent loss of data // in forwarding chains during concurrent calls -- only a // forward that's a response to the current fwd_profile should // be recorded here. (This is just an optimization, and is not // related to correctness.) #if 0 /* Keep this around in case forwarding is ever implemented. */ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, data_->fwd_profile_lock (), TAO_GIOP_SYSTEM_EXCEPTION); #endif IIOP::Profile *old = data_->fwd_profile_i (new IIOP::Profile (obj2->profile)); delete old; obj2->Release (); env.clear (); // Make sure a new connection is used next time. this->handler_->close (); this->handler_ = 0; // @@ not sure this is correct! // We may not need to do this since TAO_GIOP_Invocations // get created on a per-call basis. For now we'll play it safe. } break; } // All standard exceptions from here on in the call path know for // certain that the call "completed" ... except in the case of // system exceptions which say otherwise, and for // TAO_GIOP_LOCATION_FORWARD responses. return (TAO_GIOP_ReplyStatusType) reply_status; } void TAO_GIOP::make_error (CDR &msg, ...) { ACE_UNUSED_ARG (msg); // just for now // This [static] method will be somewhat like send_error() except // that it won't actaully do any sending of data...it'll just stuff // things into the instance. } CORBA::Boolean TAO_GIOP_LocateRequestHeader::init (CDR &msg, CORBA::Environment &env) { return (msg.get_ulong (this->request_id) && msg.decode (&TC_opaque, &this->object_key, 0, env)); } // Initialize the request header from , setting for errors. CORBA::Boolean TAO_GIOP_RequestHeader::init (CDR &msg, CORBA::Environment &env) { CORBA::Boolean hdr_status; // Tear out the service context ... we currently ignore it, but it // should probably be passed to each ORB service as appropriate // (e.g. transactions, security). // // NOTE: As security support kicks in, this is a good place to // verify a digital signature, if that is required in this security // environment. It may be required even when using IPSEC security // infrastructure. hdr_status = msg.decode (&TC_ServiceContextList, &this->service_info, 0, env); // Get the rest of the request header ... hdr_status = hdr_status && msg.get_ulong (this->request_id); hdr_status = hdr_status && msg.get_boolean (this->response_expected); hdr_status = hdr_status && msg.decode (&TC_opaque, &this->object_key, 0, env); hdr_status = hdr_status && msg.decode (CORBA::_tc_string, &this->operation, 0, env); hdr_status = hdr_status && msg.decode (CORBA::_tc_Principal, &this->requesting_principal, 0, env); return hdr_status; } CORBA::Boolean TAO_GIOP::start_message (TAO_GIOP_MsgType type, CDR &msg) { msg.reset (); // if (msg.size () < TAO_GIOP_HEADER_LEN) // return CORBA::B_FALSE; char* next = msg.buffer (); next [0] = 'G'; next [1] = 'I'; next [2] = 'O'; next [3] = 'P'; next [4] = MY_MAJOR; next [5] = MY_MINOR; next [6] = TAO_ENCAP_BYTE_ORDER; next [7] = (u_char) type; msg.wr_ptr (TAO_GIOP_HEADER_LEN); return CORBA::B_TRUE; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class TAO_Unbounded_Sequence; template class TAO_Unbounded_Sequence; template class CORBA_SEQUENCE; template class CORBA_SEQUENCE; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate TAO_Unbounded_Sequence #pragma instantiate TAO_Unbounded_Sequence #pragma instantiate CORBA_SEQUENCE #pragma instantiate CORBA_SEQUENCE #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */