diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-06 16:17:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-06 16:17:06 +0000 |
commit | 14654e5360b72adf1704838b3820c7d1fc860e8e (patch) | |
tree | 0342b1cedd2262809edb951fc234bc75deb20533 /cpp | |
parent | 55ad18a1c847c1b14d48c56ce7ee253aadf86ef7 (diff) | |
download | qpid-python-14654e5360b72adf1704838b3820c7d1fc860e8e.tar.gz |
Decoupled routing from the channel and message classes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@453657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/broker/inc/Channel.h | 48 | ||||
-rw-r--r-- | cpp/broker/inc/Message.h | 19 | ||||
-rw-r--r-- | cpp/broker/inc/Router.h | 39 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 33 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 10 | ||||
-rw-r--r-- | cpp/broker/src/Router.cpp | 32 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 7 | ||||
-rw-r--r-- | cpp/broker/test/ChannelTest.cpp | 81 | ||||
-rw-r--r-- | cpp/broker/test/RouterTest.cpp | 91 | ||||
-rw-r--r-- | cpp/client/inc/IncomingMessage.h | 2 | ||||
-rw-r--r-- | cpp/client/src/IncomingMessage.cpp | 2 | ||||
-rw-r--r-- | cpp/common/framing/generated/stylesheets/utils.xsl | 4 |
12 files changed, 312 insertions, 56 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index b965665772..e76c8a63e9 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -33,6 +33,10 @@ namespace qpid { namespace broker { + /** + * Maintains state for an AMQP channel. Handles incoming and + * outgoing messages for that channel. + */ class Channel{ private: class ConsumerImpl : public virtual Consumer{ @@ -98,7 +102,15 @@ namespace qpid { qpid::concurrent::MonitorImpl deliveryLock; void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); - void publish(ExchangeRegistry* exchanges); + void checkMessage(const std::string& text); + + template<class Operation> void processMessage(Operation route){ + if(message->isComplete()){ + route(message); + message.reset(); + } + } + public: Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); @@ -107,9 +119,6 @@ namespace qpid { inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; } inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; } - void handlePublish(Message* msg); - void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges); - void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges); bool exists(string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); void cancel(string& tag); @@ -119,6 +128,37 @@ namespace qpid { void rollback(); void ack(u_int64_t deliveryTag, bool multiple); void recover(bool requeue); + + /** + * Handles the initial publish request though a + * channel. The header and (if applicable) content will be + * accumulated through calls to handleHeader() and + * handleContent() + */ + void handlePublish(Message* msg); + + /** + * A template method that handles a received header and if + * there is no content routes it using the functor passed + * in. + */ + template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ + checkMessage("Invalid message sequence: got header before publish."); + message->setHeader(header); + processMessage(route); + } + + /** + * A template method that handles a received content and + * if this completes the message, routes it using the + * functor passed in. + */ + template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ + checkMessage("Invalid message sequence: got content before publish."); + message->addContent(content); + processMessage(route); + } + }; } } diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h index 7a239adace..8b3321c2dc 100644 --- a/cpp/broker/inc/Message.h +++ b/cpp/broker/inc/Message.h @@ -29,14 +29,19 @@ namespace qpid { namespace broker { class ExchangeRegistry; - + + /** + * Represents an AMQP message, i.e. a header body, a list of + * content bodies and some details about the publication + * request. + */ class Message{ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; typedef content_list::iterator content_iterator; const ConnectionToken* const publisher; - string exchange; - string routingKey; + const string exchange; + const string routingKey; const bool mandatory; const bool immediate; bool redelivered; @@ -44,8 +49,6 @@ namespace qpid { content_list content; u_int64_t contentSize(); - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - public: typedef std::tr1::shared_ptr<Message> shared_ptr; @@ -64,10 +67,10 @@ namespace qpid { u_int32_t framesize); void redeliver(); - friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry); - + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + const string& getRoutingKey() const { return routingKey; } + const string& getExchange() const { return exchange; } }; - bool route(Message::shared_ptr& msg, ExchangeRegistry* registry); } } diff --git a/cpp/broker/inc/Router.h b/cpp/broker/inc/Router.h new file mode 100644 index 0000000000..d462b69832 --- /dev/null +++ b/cpp/broker/inc/Router.h @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Router_ +#define _Router_ + +#include "ExchangeRegistry.h" +#include "Message.h" + +/** + * A routing functor + */ +namespace qpid { + namespace broker { + class Router{ + ExchangeRegistry& registry; + public: + Router(ExchangeRegistry& registry); + void operator()(Message::shared_ptr& msg); + }; + } +} + + +#endif diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index 4fb6a52b99..ae99f4e7fa 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -126,38 +126,17 @@ void Channel::ConsumerImpl::cancel(){ if(queue) queue->cancel(this); } -void Channel::handlePublish(Message* msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); - } - message = Message::shared_ptr(msg); -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges){ +void Channel::checkMessage(const std::string& text){ if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); - } - message->setHeader(header); - if(message->isComplete()){ - publish(exchanges); + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); } } -void Channel::handleContent(AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); - } - message->addContent(content); - if(message->isComplete()){ - publish(exchanges); - } -} - -void Channel::publish(ExchangeRegistry* exchanges){ - if(!route(message, exchanges)){ - std::cout << "WARNING: Could not route message." << std::endl; +void Channel::handlePublish(Message* msg){ + if(message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); } - message.reset(); + message = Message::shared_ptr(msg); } void Channel::ack(u_int64_t deliveryTag, bool multiple){ diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp index 8ebe40410a..a4ae85e904 100644 --- a/cpp/broker/src/Message.cpp +++ b/cpp/broker/src/Message.cpp @@ -90,13 +90,3 @@ const ConnectionToken* const Message::getPublisher(){ return publisher; } -bool qpid::broker::route(Message::shared_ptr& msg, ExchangeRegistry* registry){ - Exchange* exchange = registry->get(msg->exchange); - if(exchange){ - exchange->route(msg, msg->routingKey, &(msg->getHeaderProperties()->getHeaders())); - return true; - }else{ - return false; - } -} - diff --git a/cpp/broker/src/Router.cpp b/cpp/broker/src/Router.cpp new file mode 100644 index 0000000000..c2dd74bf7d --- /dev/null +++ b/cpp/broker/src/Router.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Router.h" + +using namespace qpid::broker; + +Router::Router(ExchangeRegistry& _registry) : registry(_registry){} + +void Router::operator()(Message::shared_ptr& msg){ + Exchange* exchange = registry.get(msg->getExchange()); + if(exchange){ + exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + }else{ + std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl; + } + +} diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index eb8f37030c..63a42a7fd6 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -18,8 +18,9 @@ #include <iostream> #include "SessionHandlerImpl.h" #include "FanOutExchange.h" -#include "TopicExchange.h" #include "HeadersExchange.h" +#include "Router.h" +#include "TopicExchange.h" #include "assert.h" using namespace std::tr1; @@ -153,11 +154,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body, exchanges); + getChannel(channel)->handleHeader(body, Router(*exchanges)); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body, exchanges); + getChannel(channel)->handleContent(body, Router(*exchanges)); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp new file mode 100644 index 0000000000..73a1f97b46 --- /dev/null +++ b/cpp/broker/test/ChannelTest.cpp @@ -0,0 +1,81 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Channel.h" +#include "Message.h" +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> +#include <iostream> +#include <memory> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +struct MessageHolder{ + Message::shared_ptr last; +}; + +class DummyRouter{ + MessageHolder& holder; + +public: + DummyRouter(MessageHolder& _holder) : holder(_holder){ + } + + void operator()(Message::shared_ptr& msg){ + holder.last = msg; + } +}; + + +class ChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ChannelTest); + CPPUNIT_TEST(testIncoming); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testIncoming(){ + Channel channel(0, 0, 10000); + string routingKey("my_routing_key"); + channel.handlePublish(new Message(0, "test", routingKey, false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + string data1("abcdefg"); + string data2("hijklmn"); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + MessageHolder holder; + channel.handleHeader(header, DummyRouter(holder)); + CPPUNIT_ASSERT(!holder.last); + channel.handleContent(part1, DummyRouter(holder)); + CPPUNIT_ASSERT(!holder.last); + channel.handleContent(part2, DummyRouter(holder)); + CPPUNIT_ASSERT(holder.last); + CPPUNIT_ASSERT_EQUAL(routingKey, holder.last->getRoutingKey()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); + diff --git a/cpp/broker/test/RouterTest.cpp b/cpp/broker/test/RouterTest.cpp new file mode 100644 index 0000000000..284a28f583 --- /dev/null +++ b/cpp/broker/test/RouterTest.cpp @@ -0,0 +1,91 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Channel.h" +#include "Exchange.h" +#include "ExchangeRegistry.h" +#include "Message.h" +#include "Router.h" +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> +#include <iostream> +#include <memory> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +struct TestExchange : public Exchange{ + Message::shared_ptr msg; + string routingKey; + FieldTable* args; + + TestExchange() : Exchange("test"), args(0) {} + + void bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + } + + void unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + } + + void route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ + this->msg = msg; + this->routingKey = routingKey; + this->args = args; + } +}; + +class RouterTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(RouterTest); + CPPUNIT_TEST(test); + CPPUNIT_TEST_SUITE_END(); + + public: + + void test() + { + ExchangeRegistry registry; + TestExchange* exchange = new TestExchange(); + registry.declare(exchange); + + string routingKey("my_routing_key"); + string name("name"); + string value("value"); + Message::shared_ptr msg(new Message(0, "test", routingKey, false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + + dynamic_cast<BasicHeaderProperties*>(header->getProperties())->getHeaders().setString(name, value); + msg->setHeader(header); + + Router router(registry); + router(msg); + + CPPUNIT_ASSERT(exchange->msg); + CPPUNIT_ASSERT_EQUAL(msg, exchange->msg); + CPPUNIT_ASSERT_EQUAL(routingKey, exchange->msg->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL(routingKey, exchange->routingKey); + CPPUNIT_ASSERT_EQUAL(value, exchange->args->getString(name)); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(RouterTest); + diff --git a/cpp/client/inc/IncomingMessage.h b/cpp/client/inc/IncomingMessage.h index 1fee6af433..04b1fb40ba 100644 --- a/cpp/client/inc/IncomingMessage.h +++ b/cpp/client/inc/IncomingMessage.h @@ -47,7 +47,7 @@ namespace client { bool isReturn(); bool isDelivery(); bool isResponse(); - string& getConsumerTag();//only relevant if isDelivery() + const string& getConsumerTag();//only relevant if isDelivery() qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); u_int64_t getDeliveryTag(); void getData(string& data); diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp index 8e2604c4cb..c95a92b651 100644 --- a/cpp/client/src/IncomingMessage.cpp +++ b/cpp/client/src/IncomingMessage.cpp @@ -53,7 +53,7 @@ bool IncomingMessage::isResponse(){ return response; } -string& IncomingMessage::getConsumerTag(){ +const string& IncomingMessage::getConsumerTag(){ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); return delivered->getConsumerTag(); } diff --git a/cpp/common/framing/generated/stylesheets/utils.xsl b/cpp/common/framing/generated/stylesheets/utils.xsl index 829d38433e..70743112a9 100644 --- a/cpp/common/framing/generated/stylesheets/utils.xsl +++ b/cpp/common/framing/generated/stylesheets/utils.xsl @@ -23,8 +23,8 @@ <xsl:choose> <xsl:when test="$t='octet'">u_int8_t</xsl:when> <xsl:when test="$t='short'">u_int16_t</xsl:when> - <xsl:when test="$t='shortstr'">string&</xsl:when> - <xsl:when test="$t='longstr'">string&</xsl:when> + <xsl:when test="$t='shortstr'">const string&</xsl:when> + <xsl:when test="$t='longstr'">const string&</xsl:when> <xsl:when test="$t='bit'">bool</xsl:when> <xsl:when test="$t='long'">u_int32_t</xsl:when> <xsl:when test="$t='longlong'">u_int64_t</xsl:when> |