summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1998-11-10 05:34:20 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1998-11-10 05:34:20 +0000
commitf2d5398eecca7c54fe41a302755202f04834cdcc (patch)
treea7deac0cb027c3e462ec193a3a913faf2d864bbd
parent7d3f10ec1b023d883c02c533716eb84b138aa858 (diff)
downloadATCD-f2d5398eecca7c54fe41a302755202f04834cdcc.tar.gz
.
-rw-r--r--ChangeLog-98b6
-rw-r--r--examples/ASX/Message_Queue/buffer_stream.cpp70
2 files changed, 57 insertions, 19 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b
index 19e87a75a03..aebbb79db8f 100644
--- a/ChangeLog-98b
+++ b/ChangeLog-98b
@@ -1,3 +1,9 @@
+Mon Nov 9 23:27:59 1998 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * examples/ASX/Message_Queue/buffer_stream.cpp (main): Fixed an
+ off-by-one bug. Thanks to Rainer Blome
+ <rainer_blome@de.ibm.com> for reporting this.
+
Mon Nov 09 21:31:56 1998 David L. Levine <levine@cs.wustl.edu>
* tests/Env_Value_Test (main): prepended "./" to name of
diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp
index a3132c0beac..c610b1a73da 100644
--- a/examples/ASX/Message_Queue/buffer_stream.cpp
+++ b/examples/ASX/Message_Queue/buffer_stream.cpp
@@ -51,12 +51,13 @@ class Consumer : public Common_Task
public:
Consumer (void) {}
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
// Enqueue the message on the ACE_Message_Queue for subsequent
// handling in the svc() method.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
- // Receive message from producer and print to stdout.
virtual int svc (void);
+ // Receive message from producer and print to stdout.
private:
@@ -71,8 +72,9 @@ class Filter : public MT_Task
public:
Filter (void): count_ (1) {}
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
// Change the size of the message before passing it downstream.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
private:
size_t count_;
@@ -85,15 +87,20 @@ int
Common_Task::open (void *)
{
if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "spawn"),
+ -1);
return 0;
}
int
Common_Task::close (u_long exit_status)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting with status %d in module %s\n",
- exit_status, this->name ()));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) thread is exiting with status %d in module %s\n",
+ exit_status,
+ this->name ()));
// Can do anything here that is required when a thread exits, e.g.,
// storing thread-specific information in some other storage
@@ -119,9 +126,11 @@ Producer::svc (void)
ACE_Message_Block *mb;
- ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1);
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
- n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ());
+ n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), BUFSIZ);
if (n <= 0)
{
@@ -129,7 +138,9 @@ Producer::svc (void)
mb->length (0);
if (this->put_next (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put_next"));
break;
}
@@ -142,7 +153,9 @@ Producer::svc (void)
mb->rd_ptr ()[n] = '\0';
if (this->put_next (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put_next"));
}
}
@@ -175,7 +188,8 @@ Consumer::svc (void)
{
ACE_Message_Block *mb;
- this->timeout_.sec (ACE_OS::time (0) + 4); // Wait for upto 4 seconds
+ // Wait for upto 4 seconds.
+ this->timeout_.sec (ACE_OS::time (0) + 4);
result = this->getq (mb, &this->timeout_);
@@ -246,20 +260,37 @@ main (int, char *argv[])
MT_Module *fm;
MT_Module *cm;
- ACE_NEW_RETURN (pm, MT_Module ("Consumer", new Consumer), -1);
- ACE_NEW_RETURN (fm, MT_Module ("Filter", new Filter), -1);
- ACE_NEW_RETURN (cm, MT_Module ("Producer", new Producer), -1);
+ ACE_NEW_RETURN (pm,
+ MT_Module ("Consumer",
+ new Consumer),
+ -1);
+ ACE_NEW_RETURN (fm,
+ MT_Module ("Filter",
+ new Filter),
+ -1);
+ ACE_NEW_RETURN (cm,
+ MT_Module ("Producer",
+ new Producer),
+ -1);
// Create Producer and Consumer Modules and push them onto the
// Stream. All processing is performed in the Stream.
if (stream.push (pm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "push"),
+ 1);
else if (stream.push (fm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "push"),
+ 1);
else if (stream.push (cm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1);
-
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "push"),
+ 1);
// Barrier synchronization: wait for the threads to exit, then exit
// ourselves.
ACE_Thread_Manager::instance ()->wait ();
@@ -269,7 +300,8 @@ main (int, char *argv[])
int
main (int, char *[])
{
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ ACE_ERROR ((LM_ERROR,
+ "threads not supported on this platform\n"));
return 0;
}
#endif /* ACE_HAS_THREADS */