1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
// $Id$
#include "Protocol_Task.h"
// Construct the object and remember the thread count.
Protocol_Task::Protocol_Task( int _thr_count )
: desired_thr_count_(_thr_count)
{
}
Protocol_Task::~Protocol_Task(void)
{
}
// Activate the object if necessary.
int Protocol_Task::open(void *arg)
{
ACE_UNUSED_ARG(arg);
if( desired_thr_count_ )
{
return this->activate(THR_NEW_LWP, desired_thr_count_);
}
return(0);
}
/* When we're being closed by the ACE_Stream and we've got threads to
worry about then we drop a hangup message onto the message queue so
that svc() will go away. Except for the call to is_active(), this
is lifted directly from Tutorial 14.
*/
int Protocol_Task::close(u_long flags)
{
if (flags == 1 && is_active() )
{
ACE_Message_Block *hangupBlock = new ACE_Message_Block();
hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);
if (this->putq(hangupBlock->duplicate()) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
}
hangupBlock->release();
return this->wait();
}
return 0;
}
/* The put() method has to make a decision. If we've got threads then
put the unit of work onto the message queue for svc() to deal
with. If not then process() it directly.
*/
int Protocol_Task::put(ACE_Message_Block *message,ACE_Time_Value *timeout)
{
if( is_active() )
{
return this->putq(message,timeout);
}
return this->process(message,timeout);
}
/* svc() is about what you would expect. This is again lifted
directly from Tutorial 14 but with a call to process() for handling
the logic instead of doing the work right here.
*/
int Protocol_Task::svc(void)
{
ACE_Message_Block * message;
while (1)
{
// Get a message
if ( this->getq(message, 0) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Protocol_Task::svc() getq"), -1);
}
ACE_DEBUG ((LM_DEBUG, "(%P|%t) Protocol_Task::svc() got message\n"));
// Check for hangup
if (message->msg_type() == ACE_Message_Block::MB_HANGUP) {
ACE_DEBUG ((LM_DEBUG, "(%P|%t) Protocol_Task::svc() -- HANGUP block received\n"));
// Hangup our thread-pool peers (if any)
if (this->putq(message->duplicate()) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Protocol_Task::svc() putq"), -1);
}
// Leave svc()
break;
}
// Do some work on the data.
if( this->process(message->duplicate(),0) == -1 )
{
break;
}
// Give up the message block before we go get another.
message->release();
}
// Give up the message block that caused us to exit the
// while(1) loop.
message->release();
return(0);
}
/* There's nothing really magic about process(). We just decide if
we're moving data upstream or downstream and invoke the appropriate
virtual function to handle it.
*/
int Protocol_Task::process(ACE_Message_Block * message, ACE_Time_Value *timeout)
{
if( this->is_writer() )
{
return this->send(message,timeout);
}
return this->recv(message,timeout);
}
/* We must insist that derivatives provide a meaningful overload for
these methods. It's fairly common for ACE object methods to return
an error when an overload is expected but the method cannot be
safely made pure virtual.
*/
int Protocol_Task::send(ACE_Message_Block *message,
ACE_Time_Value *timeout)
{
return -1;
}
int Protocol_Task::recv(ACE_Message_Block * message,
ACE_Time_Value *timeout)
{
return -1;
}
|