summaryrefslogtreecommitdiff
path: root/docs/tutorials/015/Protocol_Task.cpp
blob: 3cfe7495539bd2065a53f18280511233309cbe77 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146

// $Id$

#include "Protocol_Task.h"

// Construct the object and remember the thread count.
Protocol_Task::Protocol_Task( int _thr_count )
        : desired_thr_count_(_thr_count)
{
}

Protocol_Task::~Protocol_Task(void)
{
}

// Activate the object if necessary.
int Protocol_Task::open(void *arg) 
{
  ACE_UNUSED_ARG(arg);

  if( desired_thr_count_ )
  {
      return this->activate(THR_NEW_LWP, desired_thr_count_);
  }

  return(0);
}

/* When we're being closed by the ACE_Stream and we've got threads to
   worry about then we drop a hangup message onto the message queue so 
   that svc() will go away.  Except for the call to is_active(), this
   is lifted directly from Tutorial 14.
*/
int Protocol_Task::close(u_long flags)
{
    if (flags == 1 && is_active() )
    {
        ACE_Message_Block *hangupBlock = new ACE_Message_Block();

        hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);
    
        if (this->putq(hangupBlock->duplicate()) == -1) {
            ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
        }
    
        hangupBlock->release();

        return this->wait();
    }

    return 0;
}

/* The put() method has to make a decision.  If we've got threads then 
   put the unit of work onto the message queue for svc() to deal
   with.  If not then process() it directly.
*/
int Protocol_Task::put(ACE_Message_Block *message,ACE_Time_Value *timeout)
{
    if( is_active() )
    {
        return this->putq(message,timeout);
    }

    return this->process(message,timeout);
}

/* svc() is about what you would expect.  This is again lifted
   directly from Tutorial 14 but with a call to process() for handling 
   the logic instead of doing the work right here.
 */
int Protocol_Task::svc(void)
{
    ACE_Message_Block * message;
    
    while (1)
    {
            // Get a message
        if ( this->getq(message, 0) == -1) {
            ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Protocol_Task::svc() getq"), -1);
        }

        ACE_DEBUG ((LM_DEBUG, "(%P|%t) Protocol_Task::svc() got message\n"));

            // Check for hangup
        if (message->msg_type() == ACE_Message_Block::MB_HANGUP) {
 
            ACE_DEBUG ((LM_DEBUG, "(%P|%t) Protocol_Task::svc() -- HANGUP block received\n"));

                // Hangup our thread-pool peers (if any)
            if (this->putq(message->duplicate()) == -1) {
                ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Protocol_Task::svc() putq"), -1);
            }

                // Leave svc()
            break;
        }

            // Do some work on the data.
        if( this->process(message->duplicate(),0) == -1 )
        {
            break;
        }

            // Give up the message block before we go get another.
        message->release();
    }

        // Give up the message block that caused us to exit the
        // while(1) loop.
    message->release();
    
    return(0);
}

/* There's nothing really magic about process().  We just decide if
   we're moving data upstream or downstream and invoke the appropriate 
   virtual function to handle it.
*/
int Protocol_Task::process(ACE_Message_Block * message, ACE_Time_Value *timeout)
{
    if( this->is_writer() )
    {
        return this->send(message,timeout);
    }
    
    return this->recv(message,timeout);
}

/* We must insist that derivatives provide a meaningful overload for
   these methods.  It's fairly common for ACE object methods to return 
   an error when an overload is expected but the method cannot be
   safely made pure virtual.
 */
 
int Protocol_Task::send(ACE_Message_Block *message,
                        ACE_Time_Value *timeout)
{
    return -1;
}

int Protocol_Task::recv(ACE_Message_Block * message,
                        ACE_Time_Value *timeout)
{
    return -1;
}