1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
/* -*- C++ -*- */
// $Id$
/* These Gateway handler classes process Peer messages sent from the
communication gateway daemon (gatewayd) to its various peers, e.g.,
CF and ETS, (represented collectively in this prototype as peerd).
. These classes works as follows:
1. Gateway_Acceptor creates a listener endpoint and waits passively
for gatewayd to connect with it.
2. When gatewayd connects, Gateway_Acceptor creates an
Gateway_Handler object that sends/receives messages from
gatewayd.
3. Gateway_Handler waits for gatewayd to inform it of its routing
ID, which is prepended to all outgoing messages send from peerd.
4. Once the routing ID is set, peerd periodically sends messages to
gatewayd. Peerd also receives and "processes" messages
forwarded to it from gatewayd. In this program, peerd
"processes" messages by writing them to stdout. */
#if !defined (GATEWAY_HANDLER)
#define GATEWAY_HANDLER
#include "ace/Service_Config.h"
#include "ace/Svc_Handler.h"
#include "ace/Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/Map_Manager.h"
#include "Peer_Message.h"
// Forward declaration.
class Gateway_Handler;
// Maps a ACE_HANDLE onto a Gateway_Handler *.
typedef ACE_Map_Manager <ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_MAP;
typedef ACE_Map_Iterator<ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_ITERATOR;
typedef ACE_Map_Entry <ACE_HANDLE, Gateway_Handler *> MAP_ENTRY;
// Handle Peer messages arriving as events.
class Gateway_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
{
public:
Gateway_Handler (ACE_Thread_Manager * = 0);
virtual int open (void * = 0);
// Initialize the handler (called by ACE_Acceptor::handle_input())
virtual int handle_input (ACE_HANDLE);
// Receive and process peer messages.
virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
// Send a message to a gateway (may be queued if necessary).
virtual int handle_output (ACE_HANDLE);
// Finish sending a message when flow control conditions abate.
virtual int handle_timeout (const ACE_Time_Value &,
const void *arg);
// Periodically send messages via ACE_Reactor timer mechanism.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
// Perform object termination.
void map (HANDLER_MAP *);
// Cache a binding to the HANDLER_MAP.
protected:
typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited;
// We'll allow up to 16 megabytes to be queued per-output
// channel!!!! This is clearly a policy in search of refinement...
enum { QUEUE_SIZE = 1024 * 1024 * 16 };
int handle_signal (int signum, siginfo_t *, ucontext_t *);
Peer_Header::ROUTING_ID routing_id_;
// Routing ID of the peer (obtained from gatewayd).
virtual int nonblk_put (ACE_Message_Block *mb);
// Perform a non-blocking put().
virtual int recv_peer (ACE_Message_Block *&);
// Receive an Peer message from gatewayd.
virtual int send_peer (ACE_Message_Block *);
// Send an Peer message to gatewayd.
int xmit_stdin (void);
// Receive a message from stdin and send it to the gateway.
int (Gateway_Handler::*do_action_) (void);
// Pointer-to-member-function for the current action to run in this state.
int await_route_id (void);
// Action that receives the route id.
int await_messages (void);
// Action that receives messages.
ACE_Message_Block *msg_frag_;
// Keep track of message fragment to handle non-blocking recv's from gateway.
size_t total_bytes_;
// The total number of bytes sent/received to the gateway.
HANDLER_MAP *map_;
// Maps the ACE_HANDLE onto the Gateway_Handler *.
};
// A factory class that accept connections from gatewayd and
// dynamically creates a new Gateway_Handler object to do the dirty work.
class Gateway_Acceptor : public ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR>
{
public:
// = Initialization methods, called when dynamically linked.
Gateway_Acceptor (Gateway_Handler *handler);
virtual int init (int argc, char *argv[]);
// Initialize the acceptor.
virtual int info (char **, size_t) const;
// Return info about this service.
virtual int fini (void);
// Perform termination.
virtual Gateway_Handler *make_svc_handler (void);
// Factory method that creates the Gateway_Handler once.
virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
// Handle various signals (e.g., SIGPIPE)
HANDLER_MAP map_;
// Maps the ACE_HANDLE onto the Gateway_Handler *.
Gateway_Handler *gateway_handler_;
// Pointer to memory allocated exactly once.
typedef ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR> inherited;
};
// Factory function that allocates a new Peer daemon.
extern "C" ACE_Service_Object *_alloc_peerd (void);
#endif /* GATEWAY_HANDLER */
|