From f2d5398eecca7c54fe41a302755202f04834cdcc Mon Sep 17 00:00:00 2001 From: schmidt Date: Tue, 10 Nov 1998 05:34:20 +0000 Subject: . --- ChangeLog-98b | 6 +++ examples/ASX/Message_Queue/buffer_stream.cpp | 70 ++++++++++++++++++++-------- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/ChangeLog-98b b/ChangeLog-98b index 19e87a75a03..aebbb79db8f 100644 --- a/ChangeLog-98b +++ b/ChangeLog-98b @@ -1,3 +1,9 @@ +Mon Nov 9 23:27:59 1998 Douglas C. Schmidt + + * examples/ASX/Message_Queue/buffer_stream.cpp (main): Fixed an + off-by-one bug. Thanks to Rainer Blome + for reporting this. + Mon Nov 09 21:31:56 1998 David L. Levine * tests/Env_Value_Test (main): prepended "./" to name of diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp index a3132c0beac..c610b1a73da 100644 --- a/examples/ASX/Message_Queue/buffer_stream.cpp +++ b/examples/ASX/Message_Queue/buffer_stream.cpp @@ -51,12 +51,13 @@ class Consumer : public Common_Task public: Consumer (void) {} + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); // Enqueue the message on the ACE_Message_Queue for subsequent // handling in the svc() method. - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); - // Receive message from producer and print to stdout. virtual int svc (void); + // Receive message from producer and print to stdout. private: @@ -71,8 +72,9 @@ class Filter : public MT_Task public: Filter (void): count_ (1) {} + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); // Change the size of the message before passing it downstream. - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); private: size_t count_; @@ -85,15 +87,20 @@ int Common_Task::open (void *) { if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "spawn"), + -1); return 0; } int Common_Task::close (u_long exit_status) { - ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting with status %d in module %s\n", - exit_status, this->name ())); + ACE_DEBUG ((LM_DEBUG, + "(%t) thread is exiting with status %d in module %s\n", + exit_status, + this->name ())); // Can do anything here that is required when a thread exits, e.g., // storing thread-specific information in some other storage @@ -119,9 +126,11 @@ Producer::svc (void) ACE_Message_Block *mb; - ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); - n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); + n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), BUFSIZ); if (n <= 0) { @@ -129,7 +138,9 @@ Producer::svc (void) mb->length (0); if (this->put_next (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "put_next")); break; } @@ -142,7 +153,9 @@ Producer::svc (void) mb->rd_ptr ()[n] = '\0'; if (this->put_next (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "put_next")); } } @@ -175,7 +188,8 @@ Consumer::svc (void) { ACE_Message_Block *mb; - this->timeout_.sec (ACE_OS::time (0) + 4); // Wait for upto 4 seconds + // Wait for upto 4 seconds. + this->timeout_.sec (ACE_OS::time (0) + 4); result = this->getq (mb, &this->timeout_); @@ -246,20 +260,37 @@ main (int, char *argv[]) MT_Module *fm; MT_Module *cm; - ACE_NEW_RETURN (pm, MT_Module ("Consumer", new Consumer), -1); - ACE_NEW_RETURN (fm, MT_Module ("Filter", new Filter), -1); - ACE_NEW_RETURN (cm, MT_Module ("Producer", new Producer), -1); + ACE_NEW_RETURN (pm, + MT_Module ("Consumer", + new Consumer), + -1); + ACE_NEW_RETURN (fm, + MT_Module ("Filter", + new Filter), + -1); + ACE_NEW_RETURN (cm, + MT_Module ("Producer", + new Producer), + -1); // Create Producer and Consumer Modules and push them onto the // Stream. All processing is performed in the Stream. if (stream.push (pm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "push"), + 1); else if (stream.push (fm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "push"), + 1); else if (stream.push (cm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1); - + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "push"), + 1); // Barrier synchronization: wait for the threads to exit, then exit // ourselves. ACE_Thread_Manager::instance ()->wait (); @@ -269,7 +300,8 @@ main (int, char *argv[]) int main (int, char *[]) { - ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + ACE_ERROR ((LM_ERROR, + "threads not supported on this platform\n")); return 0; } #endif /* ACE_HAS_THREADS */ -- cgit v1.2.1