1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
|
/* -*- C++ -*- */
//=============================================================================
/**
* @file MEM_IO.h
*
* $Id$
*
* @author Nanbor Wang <nanbor@cs.wustl.edu>
*/
//=============================================================================
#ifndef ACE_MEM_IO_H
#define ACE_MEM_IO_H
#include /**/ "ace/pre.h"
#include "ace/SOCK.h"
#include "ace/MEM_SAP.h"
#include "ace/Memory_Pool.h"
#include "ace/Message_Block.h"
#include "ace/Process_Semaphore.h"
#include "ace/Process_Mutex.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
class ACE_Export ACE_Reactive_MEM_IO : public ACE_MEM_SAP
{
public:
ACE_Reactive_MEM_IO (void);
virtual ~ACE_Reactive_MEM_IO (void);
/**
* Initialize the MEM_SAP object.
*
* @a options is used to pass in the Malloc_Options to initialize
* underlying ACE_MMAP.
*/
virtual int init (ACE_HANDLE handle,
const ACE_TCHAR *name,
MALLOC_OPTIONS *options);
/**
* Fetch location of next available data into <recv_buffer_>.
* As this operation read the address of the data off the socket
* using ACE::recv, <timeout> only applies to ACE::recv.
*/
virtual ssize_t recv_buf (ACE_MEM_SAP_Node *&buf,
int flags,
const ACE_Time_Value *timeout);
/**
* Wait to to <timeout> amount of time to send <buf>. If <send>
* times out a -1 is returned with <errno == ETIME>. If it succeeds
* the number of bytes sent is returned. */
virtual ssize_t send_buf (ACE_MEM_SAP_Node *buf,
int flags,
const ACE_Time_Value *timeout);
/**
* Convert the buffer offset <off> to absolute address to <buf>.
* Return the size of valid information containing in the <buf>,
* -1 if <shm_malloc_> is not initialized.
*/
ssize_t get_buf_len (const off_t off, ACE_MEM_SAP_Node *&buf);
};
#if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
class ACE_Export ACE_MT_MEM_IO : public ACE_MEM_SAP
{
public:
typedef struct
{
ACE_MEM_SAP_Node::ACE_MEM_SAP_NODE_PTR head_;
ACE_MEM_SAP_Node::ACE_MEM_SAP_NODE_PTR tail_;
} MQ_Struct; // Structure for a simple queue
class Simple_Queue
{
public:
Simple_Queue (void);
Simple_Queue (MQ_Struct *mq);
int init (MQ_Struct *mq, ACE_MEM_SAP::MALLOC_TYPE *malloc);
int write (ACE_MEM_SAP_Node *new_msg);
ACE_MEM_SAP_Node *read (void);
private:
MQ_Struct *mq_;
ACE_MEM_SAP::MALLOC_TYPE *malloc_;
};
typedef struct
{
ACE_SYNCH_PROCESS_SEMAPHORE *sema_;
ACE_SYNCH_PROCESS_MUTEX *lock_;
Simple_Queue queue_;
} Channel;
ACE_MT_MEM_IO (void);
virtual ~ACE_MT_MEM_IO (void);
/**
* Initialize the MEM_SAP object.
*/
virtual int init (ACE_HANDLE handle,
const ACE_TCHAR *name,
MALLOC_OPTIONS *options);
/**
* Fetch location of next available data into <recv_buffer_>.
* As this operation read the address of the data off the socket
* using ACE::recv, <timeout> only applies to ACE::recv.
*/
virtual ssize_t recv_buf (ACE_MEM_SAP_Node *&buf,
int flags,
const ACE_Time_Value *timeout);
/**
* Wait to to <timeout> amount of time to send <buf>. If <send>
* times out a -1 is returned with <errno == ETIME>. If it succeeds
* the number of bytes sent is returned. */
virtual ssize_t send_buf (ACE_MEM_SAP_Node *buf,
int flags,
const ACE_Time_Value *timeout);
private:
Channel recv_channel_;
Channel send_channel_;
};
#endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
/**
* @class ACE_MEM_IO
*
* @brief Defines the methods for the ACE shared memeory wrapper I/O
* routines (e.g., send/recv).
* The shared memory transport uses ACE_SOCK_* class to
* implement the signaling mechanism so we can easily use the
* new mechanism with the Reactor pattern (which uses select
* under the hood.)
* ACE_MEM_Acceptor and ACE_MEM_Connector are used to establish
* connections. When a connection is established,
* ACE_MEM_Acceptor creates the MMAP file for data exchange and
* sends the location of the file (complete path name) to
* ACE_MEM_Connector thru the socket. ACE_MEM_Connector then
* reads the location of the file off the socket and opens up
* the same MMAP file. ACE_MEM_Stream at each side then
* contains a reference to the ACE_Mallo object using the same
* MMAP file.
* When sending information using methods provided in this
* class, ACE_MEM_IO requests a chunk of memory from the
* MALLOC_TYPE object, copy the data into the shared memory and
* send the memory offset (from the start of the ACE_Malloc)
* across the socket. This action also servers as a signal to
* the other end. The receiving side then reverses the
* procedures and copies the information into user buffer.
*/
class ACE_Export ACE_MEM_IO : public ACE_SOCK
{
public:
// = Initialization and termination methods.
/// Constructor.
ACE_MEM_IO (void);
/// Destructor.
~ACE_MEM_IO (void);
typedef enum
{
Reactive,
MT
} Signal_Strategy;
/**
* Initialize the MEM_SAP object.
*/
int init (const ACE_TCHAR *name,
ACE_MEM_IO::Signal_Strategy type = ACE_MEM_IO::Reactive,
ACE_MEM_SAP::MALLOC_OPTIONS *options = 0);
/**
* Finalizing the MEM_IO object. This method doesn't invoke
* the <remove> method.
*/
int fini (void);
/// Send an <n> byte buffer to the other process using shm_malloc_
/// connected thru the socket.
ssize_t send (const void *buf,
size_t n,
int flags) ;
/// Recv an <n> byte buffer from the shm_malloc_ thru connected socket.
ssize_t recv (void *buf,
size_t n,
int flags) ;
/// Send an <n> byte buffer to the other process using shm_malloc_
/// connected thru the socket.
ssize_t send (const void *buf,
size_t n) ;
/// Recv an <n> byte buffer from the shm_malloc_ thru connected socket.
ssize_t recv (void *buf,
size_t n) ;
/**
* Wait to to <timeout> amount of time to send up to <n> bytes into
* <buf> from <handle> (uses the <send> call). If <send> times out
* a -1 is returned with <errno == ETIME>. If it succeeds the
* number of bytes sent is returned.
*/
ssize_t send (const void *buf,
size_t n,
const ACE_Time_Value *timeout);
/**
* Wait to to <timeout> amount of time to send up to <n> bytes into
* <buf> from <handle> (uses the <send> call). If <send> times out
* a -1 is returned with <errno == ETIME>. If it succeeds the
* number of bytes sent is returned.
*/
ssize_t send (const void *buf,
size_t n,
int flags,
const ACE_Time_Value *timeout);
/**
* Wait to to <timeout> amount of time to send the <message_block>.
* If <send> times out a -1 is returned with <errno == ETIME>. If
* it succeeds the number of bytes sent is returned.
*/
ssize_t send (const ACE_Message_Block *message_block,
const ACE_Time_Value *timeout);
/**
* Wait up to <timeout> amount of time to receive up to <n> bytes
* into <buf> from <handle> (uses the <recv> call). If <recv> times
* out a -1 is returned with <errno == ETIME>. If it succeeds the
* number of bytes received is returned.
*/
ssize_t recv (void *buf,
size_t n,
const ACE_Time_Value *timeout);
/**
* Wait up to <timeout> amount of time to receive up to <n> bytes
* into <buf> from <handle> (uses the <recv> call). If <recv> times
* out a -1 is returned with <errno == ETIME>. If it succeeds the
* number of bytes received is returned.
*/
ssize_t recv (void *buf,
size_t n,
int flags,
const ACE_Time_Value *timeout);
/// Dump the state of an object.
void dump (void) const;
/// Declare the dynamic allocation hooks.
ACE_ALLOC_HOOK_DECLARE;
/// Return the local endpoint port number. Returns 0 if successful,
/// else -1.
/* int get_local_port (u_short &) const;
/// Return the port number of the remotely connected peer (if there
/// is one). Returns 0 if successful, else -1.
int get_remote_port (u_short &) const;
*/
private:
ssize_t fetch_recv_buf (int flag, const ACE_Time_Value *timeout);
/// Actual deliverying mechanism.
ACE_MEM_SAP *deliver_strategy_;
/// Internal pointer for support recv/send.
ACE_MEM_SAP_Node *recv_buffer_;
/// Record the current total buffer size of <recv_buffer_>.
ssize_t buf_size_;
/// Record the current read pointer location in <recv_buffer_>.
ssize_t cur_offset_;
};
#if defined (__ACE_INLINE__)
#include "ace/MEM_IO.inl"
#endif /* __ACE_INLINE__ */
#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */
#include /**/ "ace/post.h"
#endif /* ACE_SOCK_IO_H */
|