summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-06-26 00:26:01 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-06-26 00:26:01 +0000
commitdd8e7d30aff4087b2243f36e282e5923ec35ae41 (patch)
treec36685d175cf6f001b9b85be4a4f49a7fdd3d635
parentde0f6a7f015db1a546863c25e8237c4062ed8146 (diff)
downloadATCD-dd8e7d30aff4087b2243f36e282e5923ec35ae41.tar.gz
ChangeLogTag: Mon Jun 25 19:21:43 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp76
-rw-r--r--TAO/tao/GIOP_Message_Base.h7
-rw-r--r--TAO/tao/GIOP_Message_Base.i46
-rw-r--r--TAO/tao/GIOP_Message_State.inl9
-rw-r--r--TAO/tao/IIOP_Transport.cpp35
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp2
-rw-r--r--TAO/tao/Incoming_Message_Queue.h7
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl9
-rw-r--r--TAO/tao/Pluggable_Messaging.h5
-rw-r--r--TAO/tao/Transport.cpp90
-rw-r--r--TAO/tao/Transport.h12
11 files changed, 192 insertions, 106 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index e3882c63802..8fb375beb2b 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -318,7 +318,6 @@ int
TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
{
-
if (this->message_state_.parse_message_header (incoming) == -1)
{
return -1;
@@ -330,24 +329,73 @@ TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
return 0;
}
-size_t
+ssize_t
TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
{
- // @@Bala: Look for fragmentation here..
- // If we had recd. fragmented messages and if the GIOP minor version
- // is greater than 1, then include the FRAGMENT HEADER to calculate
- // the effective length of the message
- /*if (this->message_state_.more_fragments_ &&
- this->message_state_.giop_version_.minor > 1)
- len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
- */
+ //Actual message size including the header..
+ CORBA::ULong msg_size =
+ this->message_state_.message_size ();
+
+ ssize_t len = incoming.length ();
+
+ if (len > msg_size)
+ {
+ // Move the rd_ptr so that we can extract the next message.
+ incoming.rd_ptr (msg_size);
+ return -1;
+ }
+ else if (len == msg_size)
+ return 0;
+
+ return msg_size - len;
+}
+
+
+int
+TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *qd)
+{
+ TAO_GIOP_Message_State msg_state;
+
+ if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ if (incoming.length () > 0)
+ {
+ qd =
+ this->make_queued_data (incoming.length ());
+
+ qd.missing_data_ = -1;
+ }
+ return 0;
+ }
+
+
+ if (msg_state.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ size_t copying_len = msg_state.message_size ();
+
+ qd = this->make_queued_data (copying_len);
+
+ if (copying_len > incoming.length ())
+ {
+ qd.missing_data_ =
+ copying_len - incoming.length ();
+
+ copying_len -= incoming.length ();
+ }
- size_t len = incoming.length ();
+ new_mb.copy (incoming.rd_ptr (),
+ copying_len);
- if (len >= this->message_state_.message_size ())
- return 0;
+ incoming.rd_ptr (copying_len);
+ qd.byte_order_ = msg_state.byte_order_;
+ qd.major_version_ = msg_state.giop_version_.major_version;
+ qd.minor_version_ = msg_state.giop_version_.minor_version;
- return this->message_state_.message_size () - len;
+ return 1;
}
CORBA::Octet
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 1ca368005fe..e3f03e6ebec 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -108,7 +108,10 @@ public:
virtual int is_message_complete (ACE_Message_Block &message_block);
/// @@Bala:Documentation please..
- virtual size_t missing_data (ACE_Message_Block &message_block);
+ virtual ssize_t missing_data (ACE_Message_Block &message_block);
+
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *qd);
virtual CORBA::Octet byte_order (void);
@@ -201,6 +204,8 @@ private:
/// Are there any more messages that needs processing
virtual int more_messages (void);
+ /// @@Bala:Docu??
+ TAO_Queued_Data *make_queued_data (size_t sz);
private:
/// Thr message handler object that does reading and parsing of the
diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i
index 45b18282a10..33bebef4aac 100644
--- a/TAO/tao/GIOP_Message_Base.i
+++ b/TAO/tao/GIOP_Message_Base.i
@@ -4,41 +4,25 @@
//
// GIOP_Message_Base
//
-# if 0
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::header_len (void)
+TAO_Queued_Data *
+GIOP_Message_Base::make_queued_data (size_t sz)
{
- return TAO_GIOP_MESSAGE_HEADER_LEN;
-}
+ qd = TAO_Incoming_Message_Queue::get_queued_data ();
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::message_size_offset (void)
-{
- return TAO_GIOP_MESSAGE_SIZE_OFFSET;
-}
+ ACE_Data_Block *db =
+ this->orb_core_->data_block_for_message_block (sz);
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::major_version_offset (void)
-{
- return TAO_GIOP_VERSION_MAJOR_OFFSET;
-}
+ ACE_Allocator *alloc =
+ this->orb_core_->message_block_msgblock_allocator ();
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::minor_version_offset (void)
-{
- return TAO_GIOP_VERSION_MINOR_OFFSET;
-}
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::flags_offset (void)
-{
- return TAO_GIOP_MESSAGE_FLAGS_OFFSET;
-}
+ ACE_Message_Block *new_mb = mb.duplicate ();
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::message_type_offset (void)
-{
- return TAO_GIOP_MESSAGE_TYPE_OFFSET;
-}
+ qd.msg_block_ = new_mb;
+ ACE_CDR::mb_align (new_mb);
-#endif /*if 0*/
+ return qd;
+}
diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl
index 9238ba596d4..cf326079aad 100644
--- a/TAO/tao/GIOP_Message_State.inl
+++ b/TAO/tao/GIOP_Message_State.inl
@@ -5,7 +5,14 @@
ACE_INLINE CORBA::ULong
TAO_GIOP_Message_State::message_size (void) const
{
- return this->message_size_ + TAO_GIOP_MESSAGE_HEADER_LEN;
+ CORBA::ULong len =
+ this->message_size_ + TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ if (this->more_fragments_ &&
+ this->giop_version_.minor > 1)
+ len += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+
+ return len;
}
ACE_INLINE CORBA::ULong
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 997226316cc..22f56d89dad 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -83,9 +83,38 @@ TAO_IIOP_Transport::recv_i (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
- return this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
+ ssize_t n = this->connection_handler_->peer ().recv (buf,
+ len,
+ max_wait_time);
+
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure \n")
+ ACE_TEXT ("TAO - recv_i () \n")));
+ }
+
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ // Close the connection
+ this->tms_->connection_closed ();
+
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
+ {
+ return -1;
+ }
+
+ return n;
}
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index e5b239b775c..910af84ab26 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -23,7 +23,7 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
int
TAO_Incoming_Message_Queue::add_message (ACE_Message_Block *incoming,
- size_t missing_data,
+ ssize_t missing_data,
CORBA::Octet byte_order)
{
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index f5c111f3e5f..9c5515a02d9 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -66,7 +66,8 @@ public:
/// The actual message queue
ACE_Message_Block *msg_block_;
- CORBA::ULong missing_data_;
+
+ CORBA::Long missing_data_;
CORBA::Octet byte_order_;
@@ -77,6 +78,7 @@ public:
TAO_Queued_Data *next_;
};
+ static TAO_Queued_Data* get_queued_data (void);
private:
friend class TAO_Transport;
@@ -84,8 +86,9 @@ private:
/// @@Bala:Docu
+ TAO_Queued_Data *get_node (void);
+
- TAO_Queued_Data* get_node (void);
int add_node (TAO_Queued_Data *nd);
private:
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index 10713dcf732..8057696682d 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -32,9 +32,8 @@ TAO_Incoming_Message_Queue::missing_data (void) const
}
-
ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data *
-TAO_Incoming_Message_Queue::get_node (void)
+TAO_Incoming_Message_Queue::get_queued_data (void)
{
// @@TODO: Use the global pool for allocationg...
TAO_Queued_Data *qd = 0;
@@ -45,6 +44,12 @@ TAO_Incoming_Message_Queue::get_node (void)
return qd;
}
+ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data *
+TAO_Incoming_Message_Queue::get_node (void)
+{
+ return TAO_Incoming_Message_Queue::get_queued_data ();
+}
+
ACE_INLINE
TAO_Incoming_Message_Queue::TAO_Queued_Data::TAO_Queued_Data (void)
: msg_block_ (0),
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 288d946999e..15801c595f7 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -126,10 +126,13 @@ public:
/// @@Bala: Documentation please...
virtual int is_message_complete (ACE_Message_Block &message_block) = 0;
- virtual size_t missing_data (ACE_Message_Block &incoming) = 0;
+ virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0;
virtual CORBA::Octet byte_order (void) = 0;
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *qd) = 0;
+
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index cd15fe869d3..be4f08f456a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -738,37 +738,7 @@ TAO_Transport::recv (char *buffer,
return -1;
// now call the template method
- ssize_t n =
- this->recv_i (buffer, len, timeout);
-
- // Most of the errors handling is common for
- // Now the message has been read
- if (n == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p \n"),
- ACE_TEXT ("TAO - read message failure \n")
- ACE_TEXT ("TAO - handle_input () \n")));
- }
-
- // Error handling
- if (n == -1)
- {
- if (errno == EWOULDBLOCK)
- return 0;
-
- // Close the connection
- this->tms_->connection_closed ();
-
- return -1;
- }
- // @@ What are the other error handling here??
- else if (n == 0)
- {
- return -1;
- }
-
- return n;
+ return this->recv_i (buffer, len, timeout);
}
@@ -867,15 +837,19 @@ TAO_Transport::handle_input_i (ACE_HANDLE h,
return -1;
// Check whether we have a complete message for processing
- size_t missing_data =
+ ssize_t missing_data =
this->missing_data (message_block);
- if (missing_data)
+ if (missing_data < 0)
+ {
+ this->consolidate_extra_messages (message_block);
+ }
+ else if (missing_data > 0)
{
- return this->consolidate_message (message_block,
- missing_data,
- h,
- max_wait_time);
+ return this->consolidate_process_message (message_block,
+ missing_data,
+ h,
+ max_wait_time);
}
@@ -891,6 +865,9 @@ TAO_Transport::handle_input_i (ACE_HANDLE h,
int
TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block)
{
+ // @@Bala:What about requests whose headers have been completely
+ // read in the last read????
+
// If we have a queue and if the last message is not complete a
// complete one, then this read will get us the remaining data. So
// do not try to parse the header if we have an incomplete message
@@ -920,9 +897,9 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block)
size_t
TAO_Transport::missing_data (ACE_Message_Block &incoming)
{
- // If we have message in the queue then find out how much of data
- // is required to get a complete message
- if (this->incoming_message_queue_.queue_length ())
+ // If we have a incomplete message in the queue then find out how
+ // much of data is required to get a complete message
+ if (!this->incoming_message_queue_.is_complete_message ())
{
return this->incoming_message_queue_.missing_data ();
}
@@ -932,10 +909,10 @@ TAO_Transport::missing_data (ACE_Message_Block &incoming)
int
-TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
- size_t missing_data,
- ACE_HANDLE h,
- ACE_Time_Value *max_wait_time)
+TAO_Transport::consolidate_process_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time)
{
// The write pointer which will be used for reading data from the
// socket.
@@ -1000,7 +977,7 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
int
TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
- size_t missing_data,
+ ssize_t missing_data,
ACE_HANDLE h,
ACE_Time_Value *max_wait_time)
{
@@ -1050,6 +1027,29 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
return 0;
}
+
+
+int
+TAO_Transport::consolidate_extra_messages (ACE_Message_Block &incoming)
+{
+ int retval = 1;
+ while (retval == 1)
+ {
+ TAO_Queued_Data *q_data = 0;
+
+ retval =
+ this->messaging_object ()->extract_next_message (incoming,
+ q_data);
+ if (q_data)
+ this->incoming_message_queue_.add_node (qd);
+ }
+
+ if (retval == -1)
+ return retval;
+
+ return 0;
+}
+
int
TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block,
CORBA::Octet byte_order,
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 789761ee4e0..5ec795a5035 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -574,16 +574,18 @@ protected:
int check_message_integrity (ACE_Message_Block &message_block);
- int consolidate_message (ACE_Message_Block &incoming,
- size_t missing_data,
- ACE_HANDLE h,
- ACE_Time_Value *max_wait_time);
+ int consolidate_process_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time);
int consolidate_message_queue (ACE_Message_Block &incoming,
- size_t missing_data,
+ ssize_t missing_data,
ACE_HANDLE h,
ACE_Time_Value *max_wait_time);
+ void consolidate_extra_messages (ACE_Message_Block &incoming);
+
/// @@ Bala: Documentation
virtual int process_parsed_messages (ACE_Message_Block &message_block,
CORBA::Octet byte_order,