1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef _TopicExchange_
#define _TopicExchange_
#include <map>
#include <vector>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
class TopicExchange : public virtual Exchange {
struct TokenIterator;
class Normalizer;
struct BindingKey { // binding for this node
Binding::vector bindingVector;
FedBinding fedBinding;
};
// Binding database:
// The dotted form of a binding key is broken up and stored in a directed tree graph.
// Common binding prefix are merged. This allows the route match alogrithm to quickly
// isolate those sub-trees that match a given routingKey.
// For example, given the routes:
// a.b.c.<...>
// a.b.d.<...>
// a.x.y.<...>
// The resulting tree would be:
// a-->b-->c-->...
// | +-->d-->...
// +-->x-->y-->...
//
class BindingNode {
public:
typedef boost::shared_ptr<BindingNode> shared_ptr;
// for database transversal (visit a node).
class TreeIterator {
public:
TreeIterator() {};
virtual ~TreeIterator() {};
virtual bool visit(BindingNode& node) = 0;
};
BindingNode() {};
BindingNode(const std::string& token) : token(token) {};
QPID_BROKER_EXTERN virtual ~BindingNode();
// add normalizedRoute to tree, return associated BindingKey
QPID_BROKER_EXTERN BindingKey* addBindingKey(const std::string& normalizedRoute);
// return BindingKey associated with normalizedRoute
QPID_BROKER_EXTERN BindingKey* getBindingKey(const std::string& normalizedRoute);
// remove BindingKey associated with normalizedRoute
QPID_BROKER_EXTERN void removeBindingKey(const std::string& normalizedRoute);
// applies iter against each node in tree until iter returns false
QPID_BROKER_EXTERN bool iterateAll(TreeIterator& iter);
// applies iter against only matching nodes until iter returns false
QPID_BROKER_EXTERN bool iterateMatch(const std::string& routingKey, TreeIterator& iter);
std::string routePattern; // normalized binding that matches this node
BindingKey bindings; // for matches against this node
protected:
std::string token; // portion of pattern represented by this node
// children
typedef std::map<const std::string, BindingNode::shared_ptr> ChildMap;
ChildMap childTokens;
BindingNode::shared_ptr starChild; // "*" subtree
BindingNode::shared_ptr hashChild; // "#" subtree
unsigned int getChildCount() { return childTokens.size() +
(starChild ? 1 : 0) + (hashChild ? 1 : 0); }
BindingKey* addBindingKey(TokenIterator& bKey,
const std::string& fullPattern);
bool removeBindingKey(TokenIterator& bKey,
const std::string& fullPattern);
BindingKey* getBindingKey(TokenIterator& bKey);
QPID_BROKER_EXTERN virtual bool iterateMatch(TokenIterator& rKey, TreeIterator& iter);
bool iterateMatchChildren(const TokenIterator& key, TreeIterator& iter);
};
// Special case: ("*" token) Node in the tree for a match exactly one wildcard
class StarNode : public BindingNode {
public:
StarNode();
~StarNode() {};
protected:
virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter);
};
// Special case: ("#" token) Node in the tree for a match zero or more
class HashNode : public BindingNode {
public:
HashNode();
~HashNode() {};
protected:
virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter);
};
BindingNode bindingTree;
unsigned long nBindings;
qpid::sys::RWlock lock; // protects bindingTree and nBindings
qpid::sys::RWlock cacheLock; // protects cache
std::map<std::string, BindingList> bindingCache; // cache of matched routes.
class ClearCache {
private:
qpid::sys::RWlock* cacheLock;
std::map<std::string, BindingList>* bindingCache;
bool cleared;
public:
ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
bindingCache(bc),cleared(false) {};
void clearCache() {
qpid::sys::RWlock::ScopedWlock l(*cacheLock);
if (!cleared) {
bindingCache->clear();
cleared =true;
}
};
~ClearCache(){
clearCache();
};
};
BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern);
bool deleteBinding(Queue::shared_ptr queue,
const std::string& routingKey,
BindingKey *bk);
class ReOriginIter;
class BindingsFinderIter;
class QueueFinderIter;
public:
static const std::string typeName;
static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern);
QPID_BROKER_EXTERN TopicExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN TopicExchange(const std::string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
QPID_BROKER_EXTERN virtual ~TopicExchange();
virtual bool supportsDynamicBinding() { return true; }
class TopicExchangeTester;
friend class TopicExchangeTester;
};
}
}
#endif
|