summaryrefslogtreecommitdiff
path: root/examples/C++NPv2/display_logfile.cpp
blob: 19f251e94633893627e2464666bfbb95f5e43f16 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
/*
** $Id$
**
** Copyright 2002 Addison Wesley. All Rights Reserved.
*/

#include "ace/ACE.h"
#include "ace/CDR_Stream.h"
#include "ace/FILE_Addr.h"
#include "ace/FILE_Connector.h"
#include "ace/FILE_IO.h"
#include "ace/Message_Block.h"
#include "ace/Module.h"
#include "ace/SString.h"
#include "ace/Stream.h"
#include "ace/Task.h"
#include "ace/Thread_Manager.h"
#include "ace/Lock_Adapter_T.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_time.h"

template <class TASK>
class Logrec_Module : public ACE_Module<ACE_SYNCH>
{
public:
  Logrec_Module (const ACE_TCHAR *name)
  {
    this->open (name,
		&task_, // Initialize writer-side task.
		0,      // Ignore reader-side task.
		0,
		ACE_Module<ACE_SYNCH>::M_DELETE_READER);
  }
private:
  TASK task_;
};
#define LOGREC_MODULE(NAME) \
  typedef Logrec_Module<NAME> NAME##_Module

class Logrec_Reader : public ACE_Task<ACE_SYNCH>
{
private:
  ACE_TString filename_; // Name of logfile.
  ACE_FILE_IO logfile_;  // File containing log records.

public:
  enum {MB_CLIENT = ACE_Message_Block::MB_USER,
        MB_TYPE, MB_PID, MB_TIME, MB_TEXT};

  Logrec_Reader (const ACE_TString &file): filename_ (file) {}

  virtual int open (void *) {
    ACE_FILE_Addr name (filename_.c_str ());
    ACE_FILE_Connector connector;
    if (connector.connect (logfile_, name) == -1)
      return -1;
    return activate ();
  }

  virtual int svc () {
    const size_t FileReadSize = 8 * 1024;
    ACE_Message_Block mblk (FileReadSize);

    for (;; mblk.crunch ()) {
      // Read as much as will fit in the message block.
      ssize_t bytes_read = logfile_.recv (mblk.wr_ptr (),
                                          mblk.space ());
      if (bytes_read <= 0)
        break;
      mblk.wr_ptr (static_cast<size_t> (bytes_read));

      // We have a bunch of data from the log file. The data is
      // arranged like so:
      //    hostname\0
      //    CDR-encoded log record
      // So, first we scan for the end of the host name, then
      // initialize another ACE_Message_Block aligned for CDR
      // demarshaling and copy the remainder of the block into it. We
      // can't use duplicate() because we need to be sure the data
      // pointer is aligned properly for CDR demarshaling.  If at any
      // point, there's not enough data left in the message block to
      // extract what's needed, crunch the block to move all remaining
      // data to the beginning and read more from the file.
      for (;;) {
        size_t name_len = ACE_OS::strnlen
                             (mblk.rd_ptr (), mblk.length ());
        if (name_len == mblk.length ()) break;

        char *name_p = mblk.rd_ptr ();
        ACE_Message_Block *rec, *head, *temp;
        ACE_NEW_RETURN
          (head, ACE_Message_Block (name_len, MB_CLIENT), 0);
        head->copy (name_p, name_len);
        mblk.rd_ptr (name_len + 1);   // Skip nul also

        size_t need = mblk.length () + ACE_CDR::MAX_ALIGNMENT;
        ACE_NEW_RETURN (rec, ACE_Message_Block (need), 0);
        ACE_CDR::mb_align (rec);
        rec->copy (mblk.rd_ptr (), mblk.length ());

        // Now rec contains the remaining data we've read so far from
        // the file. Create an ACE_InputCDR to start demarshaling the
        // log record, header first to find the length, then the data.
        // Since the ACE_InputCDR constructor increases the reference count
        // on rec, we release it upon return to prevent leaks.
        // The cdr 'read' methods return 0 on failure, 1 on success.
        ACE_InputCDR cdr (rec); rec->release ();
        ACE_CDR::Boolean byte_order;
        if (!cdr.read_boolean (byte_order)) {
          head->release (); rec->release (); break;
        }
        cdr.reset_byte_order (byte_order);

        // Now read the length of the record. From there, we'll know
        // if rec contains the complete record or not.
        ACE_CDR::ULong length;
        if (!cdr.read_ulong (length)) {
          head->release (); mblk.rd_ptr (name_p); break;
        }
        if (length > cdr.length ()) {
          head->release (); mblk.rd_ptr (name_p); break;
        }

        // The complete record is in rec... grab all the fields into
        // separate, chained message blocks.
        ACE_NEW_RETURN (temp,
                        ACE_Message_Block (length, MB_TEXT),
                        0);
        ACE_NEW_RETURN
          (temp,
           ACE_Message_Block (2 * sizeof (ACE_CDR::Long),
                              MB_TIME, temp),
           0);
        ACE_NEW_RETURN
          (temp,
           ACE_Message_Block (sizeof (ACE_CDR::Long),
                              MB_PID, temp),
           0);
        ACE_NEW_RETURN
          (temp,
           ACE_Message_Block (sizeof (ACE_CDR::Long),
                              MB_TYPE, temp),
           0);
        head->cont (temp);

        // Extract the type
        ACE_CDR::Long *lp;
        lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
        cdr >> *lp;
        temp->wr_ptr (sizeof (ACE_CDR::Long));
        temp = temp->cont ();

        // Extract the pid
        lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
        cdr >> *lp;
        temp->wr_ptr (sizeof (ACE_CDR::Long));
        temp = temp->cont ();

        // Extract the timestamp (2 Longs)
        lp = reinterpret_cast<ACE_CDR::Long*> (temp->wr_ptr ());
        cdr >> *lp; ++lp; cdr >> *lp;
        temp->wr_ptr (2 * sizeof (ACE_CDR::Long));
        temp = temp->cont ();

        // Demarshal the length of the message text, then demarshal
        // the text into the block.
        ACE_CDR::ULong text_len;
        cdr >> text_len;
        cdr.read_char_array (temp->wr_ptr (), text_len);
        temp->wr_ptr (text_len);

        // Forward the whole lot to the next module.
        if (put_next (head) == -1) break;

        // Move the file-content block's read pointer up past whatever
        // was just processed. Although the mblk's rd_ptr has not been
        // moved, cdr's has.  Therefore, use its length() to determine
        // how much is left.
        mblk.rd_ptr (mblk.length () - cdr.length ());
      }
    }

    // Now that the file is done, send a block down the stream to tell
    // the other modules to stop.
    ACE_Message_Block *stop;
    ACE_NEW_RETURN
      (stop, ACE_Message_Block (0, ACE_Message_Block::MB_STOP),
       0);
    put_next (stop);
    return 0;
  }
};

class Logrec_Reader_Module : public ACE_Module<ACE_SYNCH>
{
public:
  Logrec_Reader_Module (const ACE_TString &filename)
    : task_ (filename)
  {
    this->open (ACE_TEXT ("Logrec Reader"),
                &task_, // Initialize writer-side.
                0,      // Ignore reader-side.
                0,
                ACE_Module<ACE_SYNCH>::M_DELETE_READER);
  }
private:
  Logrec_Reader task_;
};

class Logrec_Writer : public ACE_Task<ACE_SYNCH>
{
public:
  // Initialization hook method.
  virtual int open (void *) { return activate (); }

  virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
  { return putq (mblk, to); }

  virtual int svc () {
    int stop = 0;
    for (ACE_Message_Block *mb; !stop && getq (mb) != -1; ) {
      if (mb->msg_type () == ACE_Message_Block::MB_STOP)
        stop = 1;
      else
        ACE::write_n (ACE_STDOUT, mb);
      put_next (mb);
    }
    return 0;
  }
};

LOGREC_MODULE (Logrec_Writer);

class Logrec_Formatter : public ACE_Task<ACE_SYNCH>
{
public:
  typedef void (*FORMATTER[5])(ACE_Message_Block *);
private:
  static FORMATTER format_; // Array of format static methods.

public:
  virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *) {
    if (mblk->msg_type () == Logrec_Reader::MB_CLIENT)
      for (ACE_Message_Block *temp = mblk;
           temp != 0;
           temp = temp->cont ()) {
        int mb_type =
          temp->msg_type () - ACE_Message_Block::MB_USER;
        (*format_[mb_type])(temp);
      }
    return put_next (mblk);
  }

  static void format_client (ACE_Message_Block *) {
    return;
  }

  static void format_type (ACE_Message_Block *mblk) {
    ACE_CDR::Long type = * (ACE_CDR::Long *)mblk->rd_ptr ();
    mblk->size (11); // Max size in ASCII of 32-bit word.
    mblk->reset ();
    mblk->wr_ptr ((size_t) sprintf (mblk->wr_ptr (), "%d", type));
  }

  static void format_pid (ACE_Message_Block *mblk) {
    ACE_CDR::Long pid = * (ACE_CDR::Long *)mblk->rd_ptr ();
    mblk->size (11); // Max size in ASCII of 32-bit word.
    mblk->reset ();
    mblk->wr_ptr ((size_t) sprintf (mblk->wr_ptr (), "%d", pid));
  }

  static void format_time (ACE_Message_Block *mblk) {
    ACE_CDR::Long secs = * (ACE_CDR::Long *)mblk->rd_ptr ();
    mblk->rd_ptr (sizeof (ACE_CDR::Long));
    ACE_CDR::Long usecs = * (ACE_CDR::Long *)mblk->rd_ptr ();
    ACE_TCHAR timestamp_t[26];
    char timestamp[26]; // Max size of ctime_r() string.
    time_t time_secs (secs);
    ACE_OS::ctime_r (&time_secs, timestamp_t, sizeof timestamp_t);
    ACE_OS::strcpy (timestamp, ACE_TEXT_ALWAYS_CHAR (timestamp_t));
    mblk->size (26); // Max size of ctime_r() string.
    mblk->reset ();
    timestamp[19] = '\0'; // NUL-terminate after the time.
    timestamp[24] = '\0'; // NUL-terminate after the date.
    size_t fmt_len (sprintf (mblk->wr_ptr (),
                             "%s.%03d %s",
                             timestamp + 4,
                             usecs / 1000,
                             timestamp + 20));
    mblk->wr_ptr (fmt_len);
  }

  static void format_data (ACE_Message_Block *) {
    return;
  }
};

Logrec_Formatter::FORMATTER Logrec_Formatter::format_ = {
  format_client, format_type, format_pid, format_time, format_data
};

LOGREC_MODULE (Logrec_Formatter);

class Logrec_Separator : public ACE_Task<ACE_SYNCH>
{
private:
  ACE_Lock_Adapter<ACE_SYNCH_MUTEX> lock_strategy_;

public:
  virtual int put (ACE_Message_Block *mblk,
                   ACE_Time_Value *) {
    if (mblk->msg_type () != ACE_Message_Block::MB_STOP) {
      ACE_Message_Block *separator;
      ACE_NEW_RETURN
        (separator,
         ACE_Message_Block (ACE_OS::strlen ("|") + 1,
                            ACE_Message_Block::MB_DATA,
                            0, 0, 0, &lock_strategy_),
         -1);
      separator->copy ("|");

      ACE_Message_Block *dup = 0;
      for (ACE_Message_Block *temp = mblk; temp != 0; ) {
        dup = separator->duplicate ();
        dup->cont (temp->cont ());
        temp->cont (dup);
        temp = dup->cont ();
      }
      ACE_Message_Block *nl;
      ACE_NEW_RETURN (nl, ACE_Message_Block (2), 0);
      nl->copy ("\n");
      dup->cont (nl);
      separator->release ();
    }
    return put_next (mblk);
  }
};

LOGREC_MODULE (Logrec_Separator);

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  if (argc != 2)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "usage: %s logfile\n", argv[0]),
                      1);
  ACE_TString logfile (argv[1]);
  ACE_Stream<ACE_SYNCH> stream;

  if (stream.push
      (new Logrec_Writer_Module (ACE_TEXT ("Writer"))) != -1
      && stream.push
      (new Logrec_Separator_Module (ACE_TEXT ("Separator"))) != -1
      && stream.push
      (new Logrec_Formatter_Module (ACE_TEXT ("Formatter"))) != -1
      && stream.push
      (new Logrec_Reader_Module (logfile)) != -1)
    return ACE_Thread_Manager::instance ()->wait () == 0 ? 0 : 1;
  return 1;
}