summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-06-29 23:40:37 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-06-29 23:40:37 +0000
commitb6582bce44f6ef6c80c085a93aa643822571bccc (patch)
treee96e87b04c73907c716a2861ae7dd214d8826c74
parent61767d4431a8a8c1e6a880afef613456eae44671 (diff)
downloadATCD-b6582bce44f6ef6c80c085a93aa643822571bccc.tar.gz
ChangeLogTag:Thu Jun 29 18:30:03 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp8
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp9
-rw-r--r--TAO/tao/Incoming_Message_Queue.h2
-rw-r--r--TAO/tao/LIST_OF_TODO5
-rw-r--r--TAO/tao/Transport.cpp137
-rw-r--r--TAO/tao/Transport.h6
6 files changed, 132 insertions, 35 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 65493035213..84166c92089 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -355,6 +355,9 @@ int
TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
TAO_Queued_Data *&qd)
{
+ //ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) Extracting extra messages... \n"));
+
TAO_GIOP_Message_State state (this->orb_core_,
this);
@@ -381,6 +384,9 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
size_t copying_len = state.message_size ();
+ // ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) ... queueing messages.. \n"));
+
qd = this->make_queued_data (copying_len);
if (copying_len > incoming.length ())
@@ -388,7 +394,7 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
qd->missing_data_ =
copying_len - incoming.length ();
- copying_len -= incoming.length ();
+ copying_len = incoming.length ();
}
qd->msg_block_->copy (incoming.rd_ptr (),
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 3582b151019..727d2e29961 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -22,13 +22,13 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
// Need to delete all the unused data-blocks
}
-void
+size_t
TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block)
{
+ size_t n = 0;
+
if (this->size_ > 0)
{
- size_t n = 0;
-
if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
{
n = block.length ();
@@ -40,8 +40,11 @@ TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block)
this->queued_data_->msg_block_->copy (block.rd_ptr (),
n);
+
this->queued_data_->missing_data_ -= n;
}
+
+ return n;
}
TAO_Queued_Data *
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 91427672e2a..3bb2c25a3cf 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -46,7 +46,7 @@ public:
ssize_t missing_data,
CORBA::Octet byte_order);*/
- void copy_message (ACE_Message_Block &block);
+ size_t copy_message (ACE_Message_Block &block);
CORBA::ULong queue_length (void);
diff --git a/TAO/tao/LIST_OF_TODO b/TAO/tao/LIST_OF_TODO
index c51df858af8..3f7cd661c94 100644
--- a/TAO/tao/LIST_OF_TODO
+++ b/TAO/tao/LIST_OF_TODO
@@ -9,11 +9,10 @@
- AMI tests
- run purify quantify
- DSI_Gateway tests
-- Remove the ORB_Core from the signature of a number of methods of
- GIOP_Message_Base class
- Go through the code again & again...
- Put tms_->close_connection () wherever we get -1 as a retval..
- Looks for a memory leak when we delete a node..
- Change the Output CDR to be on stack..
- Dont we want a cached transport on the server side...
-- BIG_REPLY test, needs to be trimmed down a bit... \ No newline at end of file
+- BIG_REPLY test, needs to be trimmed down a bit...
+- Look at purge_pending_notifications ().. \ No newline at end of file
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 319132a151a..1e7f9d61844 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -125,7 +125,7 @@ TAO_Transport::~TAO_Transport (void)
}
int
-TAO_Transport::handle_output ()
+TAO_Transport::handle_output (void)
{
if (TAO_debug_level > 4)
{
@@ -827,47 +827,89 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
max_wait_time);
+
if (n <= 0)
- return n;
+ {
+ // Even if we have read failure just try to see if we have a
+ // message on the head of the queue and try to process that...
+ this->process_queue_head (rh);
+
+ if (this->incoming_message_queue_.queue_length () == 0)
+ return n;
+ return 0;
+ }
+
+ /* if (TAO_debug_level > 2)
+ {*/
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) Read [%d] bytes \n",
+ n));
+ // }
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- if (this->parse_incoming_messages (message_block) == -1)
+ int retval = this->parse_consolidate_messages (message_block,
+ rh,
+ max_wait_time);
+
+ if (retval <= 0)
+ {
+ if (retval == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport::handle_input_i [%d] "
+ "Error while parsing and consolidating \n",
+ n));
+ }
+ return retval;
+ }
+
+ // Make a node of the message block..
+ TAO_Queued_Data qd;
+ qd.msg_block_ = &message_block;
+
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
+
+ // Process the message
+ return this->process_parsed_messages (&qd,
+ rh);
+}
+
+int
+TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->parse_incoming_messages (block) == -1)
return -1;
// Check whether we have a complete message for processing
- ssize_t missing_data =
- this->missing_data (message_block);
+ ssize_t missing_data = this->missing_data (block);
if (missing_data < 0)
{
- return this->consolidate_extra_messages (message_block,
+ // If we have read too many messages..
+ return this->consolidate_extra_messages (block,
rh);
}
else if (missing_data > 0)
{
- return this->consolidate_message (message_block,
+ // If we have missing data then try doing a read or try queueing
+ // them.
+ return this->consolidate_message (block,
missing_data,
rh,
max_wait_time);
}
-
- TAO_Queued_Data qd;
- qd.msg_block_ = &message_block;
- qd.missing_data_ = missing_data;
-
- this->messaging_object ()->get_message_data (&qd);
-
- // @@Bala:
- return this->process_parsed_messages (&qd,
- rh);
+ return 1;
}
-
-
int
TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block)
{
@@ -990,17 +1032,39 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
// If the queue did not have a complete message put this piece of
// message in the queue. We kow it did not have a complete
// message. That is why we are here.
- this->incoming_message_queue_.copy_message (incoming);
+ size_t n = this->incoming_message_queue_.copy_message (incoming);
+
+ // Update the missing data...
missing_data = this->incoming_message_queue_.missing_data ();
+ // Move the read pointer of the <incoming> message block to the end
+ // of the copied message and process the remaining portion...
+ incoming.rd_ptr (n);
- // @@todo: What will happen if we have a part of the next message in
- // the incoming message block? If that is a one-way call we handle
- // it differently. We will be in soup if the next message is a
- // two-way call. We need to process that too.... Can we call
- // process_messages () with rd_ptr () of teh incoming_message (),
- // moved?
+ // If we have some more information left in the message block..
+ if (incoming.length ())
+ {
+ // We may have to parse & consolidate. This part of the message
+ // doesn't seem to be part of the last message in the queue (as
+ // the copy () hasn't taken away this message).
+ int retval = this->parse_consolidate_messages (incoming,
+ rh,
+ max_wait_time);
+
+ // If there is an error return
+ if (retval == -1)
+ {
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Error while consolidating... \n",
+ "TAO (%P|%t) - .. part of the read message \n"));
+ }
+ return retval;
+ }
+ }
+ // If we still have some missing data..
if (missing_data > 0)
{
// Get the last message from the Queue
@@ -1010,6 +1074,8 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
ACE_Message_Block *mb =
qd->msg_block_;
+ // Try to do a read again. If we have some luck it would be
+ // great..
ssize_t n = this->recv (mb->wr_ptr (),
missing_data,
max_wait_time);
@@ -1028,6 +1094,7 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
this->incoming_message_queue_.enqueue_tail (qd);
}
+ // Process a message in the head of the queue if we have one..
return this->process_queue_head (rh);
}
@@ -1037,6 +1104,8 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
&incoming,
TAO_Resume_Handle &rh)
{
+ //ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) Consolidating multiple messages.. \n"));
// Take a message from the tail..
TAO_Queued_Data *tail =
this->incoming_message_queue_.dequeue_tail ();
@@ -1051,6 +1120,10 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
}
int retval = 1;
+
+ //ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) Extracting multiple messages.. \n"));
+
while (retval == 1)
{
TAO_Queued_Data *q_data = 0;
@@ -1209,6 +1282,13 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
int
TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
{
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head \n",
+ this->id ()));
+ }
+
// See if the message in the head of the queue is complete...
if (this->incoming_message_queue_.is_head_complete () == 1)
{
@@ -1237,6 +1317,7 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
}
// Send a notification to the reactor...
+ // @@ What do we do if the notify to the reactor fails??
reactor->notify (eh,
ACE_Event_Handler::READ_MASK);
}
@@ -1250,9 +1331,11 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
// Delete the Queued_Data..
delete qd;
+
+ return 0;
}
- return 0;
+ return -1;
}
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index f7fe3f6f3f2..965547d3369 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -568,7 +568,13 @@ protected:
*/
virtual void transition_handler_state_i (void) = 0;
+
/// @@ Bala: Documentation
+
+ int parse_consolidate_messages (ACE_Message_Block &bl,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *time);
+
int parse_incoming_messages (ACE_Message_Block &message_block);
size_t missing_data (ACE_Message_Block &message_block);