summaryrefslogtreecommitdiff
path: root/cpp/broker/inc
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-06 16:17:06 +0000
committerGordon Sim <gsim@apache.org>2006-10-06 16:17:06 +0000
commit14654e5360b72adf1704838b3820c7d1fc860e8e (patch)
tree0342b1cedd2262809edb951fc234bc75deb20533 /cpp/broker/inc
parent55ad18a1c847c1b14d48c56ce7ee253aadf86ef7 (diff)
downloadqpid-python-14654e5360b72adf1704838b3820c7d1fc860e8e.tar.gz
Decoupled routing from the channel and message classes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@453657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/inc')
-rw-r--r--cpp/broker/inc/Channel.h48
-rw-r--r--cpp/broker/inc/Message.h19
-rw-r--r--cpp/broker/inc/Router.h39
3 files changed, 94 insertions, 12 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index b965665772..e76c8a63e9 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -33,6 +33,10 @@
namespace qpid {
namespace broker {
+ /**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
class Channel{
private:
class ConsumerImpl : public virtual Consumer{
@@ -98,7 +102,15 @@ namespace qpid {
qpid::concurrent::MonitorImpl deliveryLock;
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void publish(ExchangeRegistry* exchanges);
+ void checkMessage(const std::string& text);
+
+ template<class Operation> void processMessage(Operation route){
+ if(message->isComplete()){
+ route(message);
+ message.reset();
+ }
+ }
+
public:
Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
@@ -107,9 +119,6 @@ namespace qpid {
inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
- void handlePublish(Message* msg);
- void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges);
- void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges);
bool exists(string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
void cancel(string& tag);
@@ -119,6 +128,37 @@ namespace qpid {
void rollback();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
+
+ /**
+ * Handles the initial publish request though a
+ * channel. The header and (if applicable) content will be
+ * accumulated through calls to handleHeader() and
+ * handleContent()
+ */
+ void handlePublish(Message* msg);
+
+ /**
+ * A template method that handles a received header and if
+ * there is no content routes it using the functor passed
+ * in.
+ */
+ template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
+ checkMessage("Invalid message sequence: got header before publish.");
+ message->setHeader(header);
+ processMessage(route);
+ }
+
+ /**
+ * A template method that handles a received content and
+ * if this completes the message, routes it using the
+ * functor passed in.
+ */
+ template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
+ checkMessage("Invalid message sequence: got content before publish.");
+ message->addContent(content);
+ processMessage(route);
+ }
+
};
}
}
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
index 7a239adace..8b3321c2dc 100644
--- a/cpp/broker/inc/Message.h
+++ b/cpp/broker/inc/Message.h
@@ -29,14 +29,19 @@
namespace qpid {
namespace broker {
class ExchangeRegistry;
-
+
+ /**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
class Message{
typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
typedef content_list::iterator content_iterator;
const ConnectionToken* const publisher;
- string exchange;
- string routingKey;
+ const string exchange;
+ const string routingKey;
const bool mandatory;
const bool immediate;
bool redelivered;
@@ -44,8 +49,6 @@ namespace qpid {
content_list content;
u_int64_t contentSize();
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
-
public:
typedef std::tr1::shared_ptr<Message> shared_ptr;
@@ -64,10 +67,10 @@ namespace qpid {
u_int32_t framesize);
void redeliver();
- friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
-
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ const string& getRoutingKey() const { return routingKey; }
+ const string& getExchange() const { return exchange; }
};
- bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
}
}
diff --git a/cpp/broker/inc/Router.h b/cpp/broker/inc/Router.h
new file mode 100644
index 0000000000..d462b69832
--- /dev/null
+++ b/cpp/broker/inc/Router.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Router_
+#define _Router_
+
+#include "ExchangeRegistry.h"
+#include "Message.h"
+
+/**
+ * A routing functor
+ */
+namespace qpid {
+ namespace broker {
+ class Router{
+ ExchangeRegistry& registry;
+ public:
+ Router(ExchangeRegistry& registry);
+ void operator()(Message::shared_ptr& msg);
+ };
+ }
+}
+
+
+#endif