blob: 52d3ea50d34de4285ec444d877d9bb3506c0199a (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
// -*- C++ -*-
//=============================================================================
/**
* @file Incoming_Message_Queue.h
*
* $Id$
*
* @author Balachandran Natarajan <bala@cs.wustl.edu>
*/
//=============================================================================
#ifndef TAO_INCOMING_MESSAGE_QUEUE_H
#define TAO_INCOMING_MESSAGE_QUEUE_H
#include "ace/pre.h"
#include "Pluggable_Messaging_Utils.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
/// Forward declarations
class ACE_Data_Block;
class ACE_Allocator;
class TAO_ORB_Core;
class TAO_Queued_Data;
class TAO_Transport;
/**
* @class TAO_Incoming_Message_Queue
*
* @brief A queue of the messages in the incoming data path.
*
* Please read the documentation in the TAO_Transport class to find
* out more about the design of the incoming data path.
*
* Under certain conditions TAO may have to maintain a queue
* per-connection. This queue is drained by the pluggable
* protocols framework, normally under control of the ACE_Reactor, but
* other configurations are conceivable.
*
* The memory that is allocated for holding the messages comes from
* the global pool for the following reasons
*
* - the thread that reads a part of the message would not be the same
* thread that reads and fills the rest of the message
* - the thread that actually processes the message can be totally
* different.
*
*/
class TAO_Export TAO_Incoming_Message_Queue
{
public:
/// Constructor.
TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core);
/// Destructor.
~TAO_Incoming_Message_Queue (void);
/// Adding and deleting a node from the queue.
TAO_Queued_Data *dequeue_head (void);
TAO_Queued_Data *dequeue_tail (void);
int enqueue_tail (TAO_Queued_Data *nd);
/// Copy message from <block> to the tail of the queue. The size
/// of message that is copied to the tail node is returned. The
/// number of bytes copied depends on the amount of bytes needed to
/// make the tail node consistent.
size_t copy_tail (ACE_Message_Block &block);
/// Return the length of the queue..
CORBA::ULong queue_length (void);
/// Methods for sanity check. Checks to see whether the node on the
/// head or tail is complete or not and ready for further
/// processing.
int is_tail_complete (void);
int is_head_complete (void);
/// This method checks whether the last message that was queued up
/// was fragmented...
int is_tail_fragmented (void);
/// Return the size of data that is missing in tail of the queue.
size_t missing_data_tail (void) const;
/// void missing_data (size_t data);
private:
friend class TAO_Transport;
/// Make a node for the queue.
TAO_Queued_Data *get_node (void);
private:
/// A linked listof messages that await processing
TAO_Queued_Data *queued_data_;
/// The size of the queue
CORBA::ULong size_;
/// Copy of our ORB Core
TAO_ORB_Core *orb_core_;
};
/************************************************************************/
/**
* @class TAO_Queued_Data
*
* @brief Represents a node in the queue of incoming messages.
*
* This class contains necessary information about a message that is
* stored in the queue. Such a node can be used by the incoming thread
* from the reactor to dequeue and process the message by sending it
* to the higher layers of the ORB.
*/
class TAO_Export TAO_Queued_Data
{
public:
/// Default Constructor
TAO_Queued_Data (ACE_Allocator *alloc = 0);
/// Constructor.
TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0);
/// Copy constructor.
TAO_Queued_Data (const TAO_Queued_Data &qd);
/// Creation and deletion of a node in the queue.
static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0);
static void release (TAO_Queued_Data *qd);
/// Duplicate ourselves. This creates a copy of ourselves on the
/// heap and returns a pointer to the duplicated node.
static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd);
public:
/// The message block that contains the message.
ACE_Message_Block *msg_block_;
/// Data missing in the above message that hasn't been read or
/// processed yet.
CORBA::Long missing_data_;
/// The byte order of the message that is stored in the node..
CORBA::Octet byte_order_;
/// Many protocols like GIOP have a major and minor version
/// information that would be needed to read and decipher the
/// message.
CORBA::Octet major_version_;
CORBA::Octet minor_version_;
/// Some messages can be fragmented by the protocol (this is an ORB
/// level fragmentation on top of the TCP/IP fragmentation. This
/// member indicates whether the message that we have recd. and
/// queue already has more fragments that is missing..
CORBA::Octet more_fragments_;
/// The message type of the message
TAO_Pluggable_Message_Type msg_type_;
/// Pounter to the next element in the queue.
TAO_Queued_Data *next_;
private:
/// Replace the datablock with a one allocated on the heap or
/// allocator
static void replace_data_block (ACE_Message_Block &mb);
private:
/// The allocator used to allocate this class.
ACE_Allocator *allocator_;
};
#if defined (__ACE_INLINE__)
# include "Incoming_Message_Queue.inl"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
#endif /*TAO_INCOMING_MESSAGE_QUEUE_H*/
|