summaryrefslogtreecommitdiff
path: root/ACE/examples/C++NPv2/display_logfile.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/C++NPv2/display_logfile.cpp')
-rw-r--r--ACE/examples/C++NPv2/display_logfile.cpp361
1 files changed, 361 insertions, 0 deletions
diff --git a/ACE/examples/C++NPv2/display_logfile.cpp b/ACE/examples/C++NPv2/display_logfile.cpp
new file mode 100644
index 00000000000..19f251e9463
--- /dev/null
+++ b/ACE/examples/C++NPv2/display_logfile.cpp
@@ -0,0 +1,361 @@
+/*
+** $Id$
+**
+** Copyright 2002 Addison Wesley. All Rights Reserved.
+*/
+
+#include "ace/ACE.h"
+#include "ace/CDR_Stream.h"
+#include "ace/FILE_Addr.h"
+#include "ace/FILE_Connector.h"
+#include "ace/FILE_IO.h"
+#include "ace/Message_Block.h"
+#include "ace/Module.h"
+#include "ace/SString.h"
+#include "ace/Stream.h"
+#include "ace/Task.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Lock_Adapter_T.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_time.h"
+
+template <class TASK>
+class Logrec_Module : public ACE_Module<ACE_SYNCH>
+{
+public:
+ Logrec_Module (const ACE_TCHAR *name)
+ {
+ this->open (name,
+ &task_, // Initialize writer-side task.
+ 0, // Ignore reader-side task.
+ 0,
+ ACE_Module<ACE_SYNCH>::M_DELETE_READER);
+ }
+private:
+ TASK task_;
+};
+#define LOGREC_MODULE(NAME) \
+ typedef Logrec_Module<NAME> NAME##_Module
+
+class Logrec_Reader : public ACE_Task<ACE_SYNCH>
+{
+private:
+ ACE_TString filename_; // Name of logfile.
+ ACE_FILE_IO logfile_; // File containing log records.
+
+public:
+ enum {MB_CLIENT = ACE_Message_Block::MB_USER,
+ MB_TYPE, MB_PID, MB_TIME, MB_TEXT};
+
+ Logrec_Reader (const ACE_TString &file): filename_ (file) {}
+
+ virtual int open (void *) {
+ ACE_FILE_Addr name (filename_.c_str ());
+ ACE_FILE_Connector connector;
+ if (connector.connect (logfile_, name) == -1)
+ return -1;
+ return activate ();
+ }
+
+ virtual int svc () {
+ const size_t FileReadSize = 8 * 1024;
+ ACE_Message_Block mblk (FileReadSize);
+
+ for (;; mblk.crunch ()) {
+ // Read as much as will fit in the message block.
+ ssize_t bytes_read = logfile_.recv (mblk.wr_ptr (),
+ mblk.space ());
+ if (bytes_read <= 0)
+ break;
+ mblk.wr_ptr (static_cast<size_t> (bytes_read));
+
+ // We have a bunch of data from the log file. The data is
+ // arranged like so:
+ // hostname\0
+ // CDR-encoded log record
+ // So, first we scan for the end of the host name, then
+ // initialize another ACE_Message_Block aligned for CDR
+ // demarshaling and copy the remainder of the block into it. We
+ // can't use duplicate() because we need to be sure the data
+ // pointer is aligned properly for CDR demarshaling. If at any
+ // point, there's not enough data left in the message block to
+ // extract what's needed, crunch the block to move all remaining
+ // data to the beginning and read more from the file.
+ for (;;) {
+ size_t name_len = ACE_OS::strnlen
+ (mblk.rd_ptr (), mblk.length ());
+ if (name_len == mblk.length ()) break;
+
+ char *name_p = mblk.rd_ptr ();
+ ACE_Message_Block *rec, *head, *temp;
+ ACE_NEW_RETURN
+ (head, ACE_Message_Block (name_len, MB_CLIENT), 0);
+ head->copy (name_p, name_len);
+ mblk.rd_ptr (name_len + 1); // Skip nul also
+
+ size_t need = mblk.length () + ACE_CDR::MAX_ALIGNMENT;
+ ACE_NEW_RETURN (rec, ACE_Message_Block (need), 0);
+ ACE_CDR::mb_align (rec);
+ rec->copy (mblk.rd_ptr (), mblk.length ());
+
+ // Now rec contains the remaining data we've read so far from
+ // the file. Create an ACE_InputCDR to start demarshaling the
+ // log record, header first to find the length, then the data.
+ // Since the ACE_InputCDR constructor increases the reference count
+ // on rec, we release it upon return to prevent leaks.
+ // The cdr 'read' methods return 0 on failure, 1 on success.
+ ACE_InputCDR cdr (rec); rec->release ();
+ ACE_CDR::Boolean byte_order;
+ if (!cdr.read_boolean (byte_order)) {
+ head->release (); rec->release (); break;
+ }
+ cdr.reset_byte_order (byte_order);
+
+ // Now read the length of the record. From there, we'll know
+ // if rec contains the complete record or not.
+ ACE_CDR::ULong length;
+ if (!cdr.read_ulong (length)) {
+ head->release (); mblk.rd_ptr (name_p); break;
+ }
+ if (length > cdr.length ()) {
+ head->release (); mblk.rd_ptr (name_p); break;
+ }
+
+ // The complete record is in rec... grab all the fields into
+ // separate, chained message blocks.
+ ACE_NEW_RETURN (temp,
+ ACE_Message_Block (length, MB_TEXT),
+ 0);
+ ACE_NEW_RETURN
+ (temp,
+ ACE_Message_Block (2 * sizeof (ACE_CDR::Long),
+ MB_TIME, temp),
+ 0);
+ ACE_NEW_RETURN
+ (temp,
+ ACE_Message_Block (sizeof (ACE_CDR::Long),
+ MB_PID, temp),
+ 0);
+ ACE_NEW_RETURN
+ (temp,
+ ACE_Message_Block (sizeof (ACE_CDR::Long),
+ MB_TYPE, temp),
+ 0);
+ head->cont (temp);
+
+ // Extract the type
+ ACE_CDR::Long *lp;
+ lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
+ cdr >> *lp;
+ temp->wr_ptr (sizeof (ACE_CDR::Long));
+ temp = temp->cont ();
+
+ // Extract the pid
+ lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
+ cdr >> *lp;
+ temp->wr_ptr (sizeof (ACE_CDR::Long));
+ temp = temp->cont ();
+
+ // Extract the timestamp (2 Longs)
+ lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
+ cdr >> *lp; ++lp; cdr >> *lp;
+ temp->wr_ptr (2 * sizeof (ACE_CDR::Long));
+ temp = temp->cont ();
+
+ // Demarshal the length of the message text, then demarshal
+ // the text into the block.
+ ACE_CDR::ULong text_len;
+ cdr >> text_len;
+ cdr.read_char_array (temp->wr_ptr (), text_len);
+ temp->wr_ptr (text_len);
+
+ // Forward the whole lot to the next module.
+ if (put_next (head) == -1) break;
+
+ // Move the file-content block's read pointer up past whatever
+ // was just processed. Although the mblk's rd_ptr has not been
+ // moved, cdr's has. Therefore, use its length() to determine
+ // how much is left.
+ mblk.rd_ptr (mblk.length () - cdr.length ());
+ }
+ }
+
+ // Now that the file is done, send a block down the stream to tell
+ // the other modules to stop.
+ ACE_Message_Block *stop;
+ ACE_NEW_RETURN
+ (stop, ACE_Message_Block (0, ACE_Message_Block::MB_STOP),
+ 0);
+ put_next (stop);
+ return 0;
+ }
+};
+
+class Logrec_Reader_Module : public ACE_Module<ACE_SYNCH>
+{
+public:
+ Logrec_Reader_Module (const ACE_TString &filename)
+ : task_ (filename)
+ {
+ this->open (ACE_TEXT ("Logrec Reader"),
+ &task_, // Initialize writer-side.
+ 0, // Ignore reader-side.
+ 0,
+ ACE_Module<ACE_SYNCH>::M_DELETE_READER);
+ }
+private:
+ Logrec_Reader task_;
+};
+
+class Logrec_Writer : public ACE_Task<ACE_SYNCH>
+{
+public:
+ // Initialization hook method.
+ virtual int open (void *) { return activate (); }
+
+ virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
+ { return putq (mblk, to); }
+
+ virtual int svc () {
+ int stop = 0;
+ for (ACE_Message_Block *mb; !stop && getq (mb) != -1; ) {
+ if (mb->msg_type () == ACE_Message_Block::MB_STOP)
+ stop = 1;
+ else
+ ACE::write_n (ACE_STDOUT, mb);
+ put_next (mb);
+ }
+ return 0;
+ }
+};
+
+LOGREC_MODULE (Logrec_Writer);
+
+class Logrec_Formatter : public ACE_Task<ACE_SYNCH>
+{
+public:
+ typedef void (*FORMATTER[5])(ACE_Message_Block *);
+private:
+ static FORMATTER format_; // Array of format static methods.
+
+public:
+ virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *) {
+ if (mblk->msg_type () == Logrec_Reader::MB_CLIENT)
+ for (ACE_Message_Block *temp = mblk;
+ temp != 0;
+ temp = temp->cont ()) {
+ int mb_type =
+ temp->msg_type () - ACE_Message_Block::MB_USER;
+ (*format_[mb_type])(temp);
+ }
+ return put_next (mblk);
+ }
+
+ static void format_client (ACE_Message_Block *) {
+ return;
+ }
+
+ static void format_type (ACE_Message_Block *mblk) {
+ ACE_CDR::Long type = * (ACE_CDR::Long *)mblk->rd_ptr ();
+ mblk->size (11); // Max size in ASCII of 32-bit word.
+ mblk->reset ();
+ mblk->wr_ptr ((size_t) sprintf (mblk->wr_ptr (), "%d", type));
+ }
+
+ static void format_pid (ACE_Message_Block *mblk) {
+ ACE_CDR::Long pid = * (ACE_CDR::Long *)mblk->rd_ptr ();
+ mblk->size (11); // Max size in ASCII of 32-bit word.
+ mblk->reset ();
+ mblk->wr_ptr ((size_t) sprintf (mblk->wr_ptr (), "%d", pid));
+ }
+
+ static void format_time (ACE_Message_Block *mblk) {
+ ACE_CDR::Long secs = * (ACE_CDR::Long *)mblk->rd_ptr ();
+ mblk->rd_ptr (sizeof (ACE_CDR::Long));
+ ACE_CDR::Long usecs = * (ACE_CDR::Long *)mblk->rd_ptr ();
+ ACE_TCHAR timestamp_t[26];
+ char timestamp[26]; // Max size of ctime_r() string.
+ time_t time_secs (secs);
+ ACE_OS::ctime_r (&time_secs, timestamp_t, sizeof timestamp_t);
+ ACE_OS::strcpy (timestamp, ACE_TEXT_ALWAYS_CHAR (timestamp_t));
+ mblk->size (26); // Max size of ctime_r() string.
+ mblk->reset ();
+ timestamp[19] = '\0'; // NUL-terminate after the time.
+ timestamp[24] = '\0'; // NUL-terminate after the date.
+ size_t fmt_len (sprintf (mblk->wr_ptr (),
+ "%s.%03d %s",
+ timestamp + 4,
+ usecs / 1000,
+ timestamp + 20));
+ mblk->wr_ptr (fmt_len);
+ }
+
+ static void format_data (ACE_Message_Block *) {
+ return;
+ }
+};
+
+Logrec_Formatter::FORMATTER Logrec_Formatter::format_ = {
+ format_client, format_type, format_pid, format_time, format_data
+};
+
+LOGREC_MODULE (Logrec_Formatter);
+
+class Logrec_Separator : public ACE_Task<ACE_SYNCH>
+{
+private:
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX> lock_strategy_;
+
+public:
+ virtual int put (ACE_Message_Block *mblk,
+ ACE_Time_Value *) {
+ if (mblk->msg_type () != ACE_Message_Block::MB_STOP) {
+ ACE_Message_Block *separator;
+ ACE_NEW_RETURN
+ (separator,
+ ACE_Message_Block (ACE_OS::strlen ("|") + 1,
+ ACE_Message_Block::MB_DATA,
+ 0, 0, 0, &lock_strategy_),
+ -1);
+ separator->copy ("|");
+
+ ACE_Message_Block *dup = 0;
+ for (ACE_Message_Block *temp = mblk; temp != 0; ) {
+ dup = separator->duplicate ();
+ dup->cont (temp->cont ());
+ temp->cont (dup);
+ temp = dup->cont ();
+ }
+ ACE_Message_Block *nl;
+ ACE_NEW_RETURN (nl, ACE_Message_Block (2), 0);
+ nl->copy ("\n");
+ dup->cont (nl);
+ separator->release ();
+ }
+ return put_next (mblk);
+ }
+};
+
+LOGREC_MODULE (Logrec_Separator);
+
+int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (argc != 2)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s logfile\n", argv[0]),
+ 1);
+ ACE_TString logfile (argv[1]);
+ ACE_Stream<ACE_SYNCH> stream;
+
+ if (stream.push
+ (new Logrec_Writer_Module (ACE_TEXT ("Writer"))) != -1
+ && stream.push
+ (new Logrec_Separator_Module (ACE_TEXT ("Separator"))) != -1
+ && stream.push
+ (new Logrec_Formatter_Module (ACE_TEXT ("Formatter"))) != -1
+ && stream.push
+ (new Logrec_Reader_Module (logfile)) != -1)
+ return ACE_Thread_Manager::instance ()->wait () == 0 ? 0 : 1;
+ return 1;
+}
+