summaryrefslogtreecommitdiff
path: root/cpp/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/tests')
-rw-r--r--cpp/tests/FramingTest.cpp51
-rw-r--r--cpp/tests/InProcessBroker.h153
2 files changed, 201 insertions, 3 deletions
diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp
index 445d13e384..721d8ad857 100644
--- a/cpp/tests/FramingTest.cpp
+++ b/cpp/tests/FramingTest.cpp
@@ -18,6 +18,9 @@
* under the License.
*
*/
+#include <memory>
+#include <boost/lexical_cast.hpp>
+
#include <ConnectionRedirectBody.h>
#include <ProtocolVersion.h>
#include <amqp_framing.h>
@@ -25,15 +28,20 @@
#include <qpid_test_plugin.h>
#include <sstream>
#include <typeinfo>
+#include <QpidError.h>
#include <AMQP_HighestVersion.h>
#include "AMQRequestBody.h"
#include "AMQResponseBody.h"
#include "Requester.h"
#include "Responder.h"
-#include <QpidError.h>
+#include "InProcessBroker.h"
+#include "client/Connection.h"
+#include "client/ClientExchange.h"
+#include "client/ClientQueue.h"
+using namespace qpid;
using namespace qpid::framing;
-using qpid::QpidError;
+using namespace std;
template <class T>
std::string tostring(const T& x)
@@ -60,6 +68,7 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST(testInlineContent);
CPPUNIT_TEST(testContentReference);
CPPUNIT_TEST(testContentValidation);
+ CPPUNIT_TEST(testRequestResponseRoundtrip);
CPPUNIT_TEST_SUITE_END();
private:
@@ -324,7 +333,43 @@ class FramingTest : public CppUnit::TestCase
// TODO aconway 2007-01-14: Test for batching when supported.
}
-};
+
+ // expect may contain null chars so use string(ptr,size) constructor
+ // Use sizeof(expect)-1 to strip the trailing null.
+#define ASSERT_FRAME(expect, frame) \
+ CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame))
+
+ void testRequestResponseRoundtrip() {
+ broker::InProcessBroker ibroker(version);
+ client::Connection clientConnection;
+ clientConnection.setConnector(ibroker);
+ clientConnection.open("");
+ client::Channel c;
+ clientConnection.openChannel(c);
+
+ client::Exchange exchange(
+ "MyExchange", client::Exchange::TOPIC_EXCHANGE);
+ client::Queue queue("MyQueue", true);
+ c.declareExchange(exchange);
+ c.declareQueue(queue);
+ c.bind(exchange, queue, "MyTopic", framing::FieldTable());
+ broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
+ ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++);
+ }
+ };
// Make this test suite a plugin.
diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h
new file mode 100644
index 0000000000..4ef352e677
--- /dev/null
+++ b/cpp/tests/InProcessBroker.h
@@ -0,0 +1,153 @@
+#ifndef _tests_InProcessBroker_h
+#define _tests_InProcessBroker_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+#include <algorithm>
+
+#include "framing/AMQFrame.h"
+#include "broker/Broker.h"
+#include "broker/Connection.h"
+#include "client/Connector.h"
+
+namespace qpid {
+namespace broker {
+
+/** Make a copy of a frame body. Inefficient, only intended for tests. */
+// TODO aconway 2007-01-29: from should be const, need to fix
+// AMQPFrame::encode as const.
+framing::AMQFrame copy(framing::AMQFrame& from) {
+ framing::Buffer buffer(from.size());
+ from.encode(buffer);
+ buffer.flip();
+ framing::AMQFrame result;
+ result.decode(buffer);
+ return result;
+}
+
+/**
+ * A broker that implements client::Connector allowing direct
+ * in-process connection of client to broker. Used to write round-trip
+ * tests without requiring an external broker process.
+ *
+ * Also allows you to "snoop" on frames exchanged between client & broker.
+ *
+ * Use as follows:
+ *
+ \code
+ broker::InProcessBroker ibroker(version);
+ client::Connection clientConnection;
+ clientConnection.setConnector(ibroker);
+ clientConnection.open("");
+ ... use as normal
+ \endcode
+ *
+ */
+class InProcessBroker : public client::Connector {
+ public:
+ enum Sender {CLIENT,BROKER};
+ struct Frame : public framing::AMQFrame {
+ Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
+ bool fromBroker() const { return from == BROKER; }
+ bool fromClient() const { return from == CLIENT; }
+
+ template <class MethodType>
+ MethodType* asMethod() {
+ return dynamic_cast<MethodType*>(getBody().get());
+ }
+
+ Sender from;
+ };
+ typedef std::vector<Frame> Conversation;
+
+ InProcessBroker(const framing::ProtocolVersion& ver) :
+ Connector(ver),
+ protocolInit(ver),
+ broker(broker::Broker::create()),
+ brokerOut(BROKER, conversation),
+ brokerConnection(&brokerOut, *broker),
+ clientOut(CLIENT, conversation, &brokerConnection)
+ {}
+
+ void connect(const std::string& /*host*/, int /*port*/) {}
+ void init() { brokerConnection.initiated(&protocolInit); }
+ void close() {}
+
+ /** Client's input handler. */
+ void setInputHandler(framing::InputHandler* handler) {
+ brokerOut.in = handler;
+ }
+
+ /** Called by client to send a frame */
+ void send(framing::AMQFrame* frame) {
+ clientOut.send(frame);
+ }
+
+ /** Entire client-broker conversation is recorded here */
+ Conversation conversation;
+
+ private:
+ /** OutputHandler that forwards data to an InputHandler */
+ struct OutputToInputHandler : public sys::ConnectionOutputHandler {
+ OutputToInputHandler(
+ Sender from_, Conversation& conversation_,
+ framing::InputHandler* ih=0
+ ) : from(from_), conversation(conversation_), in(ih) {}
+
+ void send(framing::AMQFrame* frame) {
+ conversation.push_back(Frame(from, copy(*frame)));
+ in->received(frame);
+ }
+
+ void close() {}
+
+ Sender from;
+ Conversation& conversation;
+ framing::InputHandler* in;
+ };
+
+ framing::ProtocolInitiation protocolInit;
+ Broker::shared_ptr broker;
+ OutputToInputHandler brokerOut;
+ broker::Connection brokerConnection;
+ OutputToInputHandler clientOut;
+};
+
+std::ostream& operator<<(
+ std::ostream& out, const InProcessBroker::Frame& frame)
+{
+ return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
+ static_cast<const framing::AMQFrame&>(frame);
+}
+std::ostream& operator<<(
+ std::ostream& out, const InProcessBroker::Conversation& conv)
+{
+ for (InProcessBroker::Conversation::const_iterator i = conv.begin();
+ i != conv.end(); ++i)
+ {
+ out << *i << std::endl;
+ }
+ return out;
+}
+
+
+}} // namespace qpid::broker
+
+#endif /*!_tests_InProcessBroker_h*/