diff options
Diffstat (limited to 'cpp/tests')
-rw-r--r-- | cpp/tests/FramingTest.cpp | 51 | ||||
-rw-r--r-- | cpp/tests/InProcessBroker.h | 153 |
2 files changed, 201 insertions, 3 deletions
diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp index 445d13e384..721d8ad857 100644 --- a/cpp/tests/FramingTest.cpp +++ b/cpp/tests/FramingTest.cpp @@ -18,6 +18,9 @@ * under the License. * */ +#include <memory> +#include <boost/lexical_cast.hpp> + #include <ConnectionRedirectBody.h> #include <ProtocolVersion.h> #include <amqp_framing.h> @@ -25,15 +28,20 @@ #include <qpid_test_plugin.h> #include <sstream> #include <typeinfo> +#include <QpidError.h> #include <AMQP_HighestVersion.h> #include "AMQRequestBody.h" #include "AMQResponseBody.h" #include "Requester.h" #include "Responder.h" -#include <QpidError.h> +#include "InProcessBroker.h" +#include "client/Connection.h" +#include "client/ClientExchange.h" +#include "client/ClientQueue.h" +using namespace qpid; using namespace qpid::framing; -using qpid::QpidError; +using namespace std; template <class T> std::string tostring(const T& x) @@ -60,6 +68,7 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_TEST(testInlineContent); CPPUNIT_TEST(testContentReference); CPPUNIT_TEST(testContentValidation); + CPPUNIT_TEST(testRequestResponseRoundtrip); CPPUNIT_TEST_SUITE_END(); private: @@ -324,7 +333,43 @@ class FramingTest : public CppUnit::TestCase // TODO aconway 2007-01-14: Test for batching when supported. } -}; + + // expect may contain null chars so use string(ptr,size) constructor + // Use sizeof(expect)-1 to strip the trailing null. +#define ASSERT_FRAME(expect, frame) \ + CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) + + void testRequestResponseRoundtrip() { + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + client::Channel c; + clientConnection.openChannel(c); + + client::Exchange exchange( + "MyExchange", client::Exchange::TOPIC_EXCHANGE); + client::Queue queue("MyQueue", true); + c.declareExchange(exchange); + c.declareQueue(queue); + c.bind(exchange, queue, "MyTopic", framing::FieldTable()); + broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++); + } + }; // Make this test suite a plugin. diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h new file mode 100644 index 0000000000..4ef352e677 --- /dev/null +++ b/cpp/tests/InProcessBroker.h @@ -0,0 +1,153 @@ +#ifndef _tests_InProcessBroker_h +#define _tests_InProcessBroker_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <vector> +#include <iostream> +#include <algorithm> + +#include "framing/AMQFrame.h" +#include "broker/Broker.h" +#include "broker/Connection.h" +#include "client/Connector.h" + +namespace qpid { +namespace broker { + +/** Make a copy of a frame body. Inefficient, only intended for tests. */ +// TODO aconway 2007-01-29: from should be const, need to fix +// AMQPFrame::encode as const. +framing::AMQFrame copy(framing::AMQFrame& from) { + framing::Buffer buffer(from.size()); + from.encode(buffer); + buffer.flip(); + framing::AMQFrame result; + result.decode(buffer); + return result; +} + +/** + * A broker that implements client::Connector allowing direct + * in-process connection of client to broker. Used to write round-trip + * tests without requiring an external broker process. + * + * Also allows you to "snoop" on frames exchanged between client & broker. + * + * Use as follows: + * + \code + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + ... use as normal + \endcode + * + */ +class InProcessBroker : public client::Connector { + public: + enum Sender {CLIENT,BROKER}; + struct Frame : public framing::AMQFrame { + Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} + bool fromBroker() const { return from == BROKER; } + bool fromClient() const { return from == CLIENT; } + + template <class MethodType> + MethodType* asMethod() { + return dynamic_cast<MethodType*>(getBody().get()); + } + + Sender from; + }; + typedef std::vector<Frame> Conversation; + + InProcessBroker(const framing::ProtocolVersion& ver) : + Connector(ver), + protocolInit(ver), + broker(broker::Broker::create()), + brokerOut(BROKER, conversation), + brokerConnection(&brokerOut, *broker), + clientOut(CLIENT, conversation, &brokerConnection) + {} + + void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(&protocolInit); } + void close() {} + + /** Client's input handler. */ + void setInputHandler(framing::InputHandler* handler) { + brokerOut.in = handler; + } + + /** Called by client to send a frame */ + void send(framing::AMQFrame* frame) { + clientOut.send(frame); + } + + /** Entire client-broker conversation is recorded here */ + Conversation conversation; + + private: + /** OutputHandler that forwards data to an InputHandler */ + struct OutputToInputHandler : public sys::ConnectionOutputHandler { + OutputToInputHandler( + Sender from_, Conversation& conversation_, + framing::InputHandler* ih=0 + ) : from(from_), conversation(conversation_), in(ih) {} + + void send(framing::AMQFrame* frame) { + conversation.push_back(Frame(from, copy(*frame))); + in->received(frame); + } + + void close() {} + + Sender from; + Conversation& conversation; + framing::InputHandler* in; + }; + + framing::ProtocolInitiation protocolInit; + Broker::shared_ptr broker; + OutputToInputHandler brokerOut; + broker::Connection brokerConnection; + OutputToInputHandler clientOut; +}; + +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Frame& frame) +{ + return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") << + static_cast<const framing::AMQFrame&>(frame); +} +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Conversation& conv) +{ + for (InProcessBroker::Conversation::const_iterator i = conv.begin(); + i != conv.end(); ++i) + { + out << *i << std::endl; + } + return out; +} + + +}} // namespace qpid::broker + +#endif /*!_tests_InProcessBroker_h*/ |