diff options
author | Alan Conway <aconway@apache.org> | 2007-01-30 18:20:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-30 18:20:00 +0000 |
commit | 98ccae7574a18f8d0a1f9e28e86ccfde4541c81f (patch) | |
tree | 528fe0c686b9193e66bdd222d0aee6c4705f34e7 /cpp | |
parent | 53d097bd7e565d08f902671180be58d4b2a9d843 (diff) | |
download | qpid-python-98ccae7574a18f8d0a1f9e28e86ccfde4541c81f.tar.gz |
* client/* framing/*: fixed client-side request ID processing.
* cpp/tests/InProcessBroker.h: For tests: connect to an in-process
broker directly, bypass the network. Keeps log of client/broker
conversation for verification in test code.
* cpp/tests/FramingTest.cpp (testRequestResponseRoundtrip):
Client/broker round-trip test for request/reponse IDs and response mark.
* APRAcceptor.cpp (APRAcceptor): fixed valgrind uninitialized error.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501502 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 39 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.h | 8 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 60 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 26 | ||||
-rw-r--r-- | cpp/lib/client/Connector.h | 43 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.h | 5 | ||||
-rw-r--r-- | cpp/lib/common/framing/MethodContext.h | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/APRAcceptor.cpp | 5 | ||||
-rw-r--r-- | cpp/tests/FramingTest.cpp | 51 | ||||
-rw-r--r-- | cpp/tests/InProcessBroker.h | 153 |
11 files changed, 307 insertions, 96 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index b93596ebfc..a207763aac 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -85,13 +85,9 @@ void Channel::protocolInit( connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - connection->send( - new AMQFrame( - version, 0, - new ConnectionTuneOkBody( - version, proposal->getChannelMax(), - connection->getMaxFrameSize(), - proposal->getHeartbeat()))); + (new ConnectionTuneOkBody( + version, proposal->getChannelMax(), connection->getMaxFrameSize(), + proposal->getHeartbeat()))->send(context); u_int16_t heartbeat = proposal->getHeartbeat(); connection->connector->setReadTimeout(heartbeat * 2); @@ -100,9 +96,8 @@ void Channel::protocolInit( // Send connection open. std::string capabilities; responses.expect(); - send(new AMQFrame( - version, 0, - new ConnectionOpenBody(version, vhost, capabilities, true))); + (new ConnectionOpenBody(version, vhost, capabilities, true)) + ->send(context); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). responses.waitForResponse(); @@ -213,7 +208,8 @@ void Channel::cancel(const std::string& tag, bool synch) { if (i != consumers.end()) { Consumer& c = i->second; if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); + (new BasicAckBody(version, c.lastDeliveryTag, true)) + ->send(context); sendAndReceiveSync<BasicCancelOkBody>( synch, new BasicCancelBody(version, tag, !synch)); consumers.erase(tag); @@ -231,7 +227,8 @@ void Channel::cancelAll(){ // trying the rest. NB no memory leaks if we do, // ConsumerMap holds values, not pointers. // - send(new BasicAckBody(version, c.lastDeliveryTag, true)); + (new BasicAckBody(version, c.lastDeliveryTag, true)) + ->send(context); } } } @@ -251,9 +248,8 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { string name = queue.getName(); - AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode)); responses.expect(); - send(body); + (new BasicGetBody(version, 0, name, ackMode))->send(context); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); if(response->isA<BasicGetOkBody>()) { @@ -276,10 +272,12 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ + // FIXME aconway 2007-01-30: Rework for message class. + string e = exchange.getName(); string key = routingKey; - send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); + (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); @@ -428,7 +426,8 @@ void Channel::deliver(Consumer& consumer, Message& msg){ if(++(consumer.count) < prefetch) break; //else drop-through case AUTO_ACK: - send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); + (new BasicAckBody(version, msg.getDeliveryTag(), multiple)) + ->send(context); consumer.lastDeliveryTag = 0; } } @@ -510,20 +509,20 @@ void Channel::closeInternal() { dispatcher.join(); } -void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m) +void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) { responses.expect(); - send(toSend); + toSend->send(context); responses.receive(c, m); } void Channel::sendAndReceiveSync( - bool sync, AMQBody* body, ClassId c, MethodId m) + bool sync, AMQMethodBody* body, ClassId c, MethodId m) { if(sync) sendAndReceive(body, c, m); else - send(body); + body->send(context); } diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 67274ddfc4..a34c95d2c4 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -124,21 +124,21 @@ class Channel : public framing::ChannelAdapter, const std::string& vhost); void sendAndReceive( - framing::AMQBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody*, framing::ClassId, framing::MethodId); void sendAndReceiveSync( bool sync, - framing::AMQBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody*, framing::ClassId, framing::MethodId); template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) { + boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID); return boost::shared_polymorphic_downcast<BodyType>( responses.getResponse()); } template <class BodyType> void sendAndReceiveSync( - bool sync, framing::AMQBody* body) { + bool sync, framing::AMQMethodBody* body) { sendAndReceiveSync( sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID); } diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 19d5cce7db..bf6c44570d 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -27,6 +27,8 @@ #include <iostream> #include <sstream> #include <MethodBodyInstances.h> +#include <boost/bind.hpp> +#include <functional> using namespace qpid::framing; using namespace qpid::sys; @@ -41,45 +43,59 @@ ChannelId Connection::channelIdCounter; const std::string Connection::OK("OK"); Connection::Connection( - bool debug, u_int32_t _max_frame_size, + bool _debug, u_int32_t _max_frame_size, const framing::ProtocolVersion& _version -) : max_frame_size(_max_frame_size), closed(true), - version(_version) -{ - connector = new Connector(version, debug, _max_frame_size); -} +) : version(_version), max_frame_size(_max_frame_size), + defaultConnector(version, debug, max_frame_size), + connector(&defaultConnector), + isOpen(false), debug(_debug) +{} Connection::~Connection(){ - delete connector; + close(); } -void Connection::open( - const std::string& _host, int _port, const std::string& uid, - const std::string& pwd, const std::string& virtualhost) +void Connection::setConnector(Connector& con) { - - host = _host; - port = _port; + connector = &con; connector->setInputHandler(this); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); +} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); connector->connect(host, port); - - // Open the special channel 0. channels[0] = &channel0; channel0.open(0, *this); - channel0.protocolInit(uid, pwd, virtualhost); + channel0.protocolInit(uid, pwd, vhost); + isOpen = true; } +void Connection::shutdown() { + close(); +} + void Connection::close( ReplyCode code, const string& msg, ClassId classId, MethodId methodId ) { - if(!closed) { + if(isOpen) { + // TODO aconway 2007-01-29: Exception handling - could end up + // partly closed. + isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( new ConnectionCloseBody( getVersion(), code, msg, classId, methodId)); + while(!channels.empty()) { + channels.begin()->second->close(); + channels.erase(channels.begin()); + } connector->close(); } } @@ -140,14 +156,4 @@ void Connection::idleOut(){ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); } -void Connection::shutdown(){ - closed = true; - //close all channels, also removes them from the map. - while(!channels.empty()){ - Channel* channel = channels.begin()->second; - if (channel != 0) - channel->close(); - } -} - }} // namespace qpid::client diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 6ee9e62e47..6a9a76eed2 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -1,5 +1,5 @@ -#ifndef _Connection_ -#define _Connection_ +#ifndef _client_Connection_ +#define _client_Connection_ /* * @@ -89,19 +89,19 @@ class Connection : public ConnectionForChannel static framing::ChannelId channelIdCounter; static const std::string OK; - std::string host; - int port; + framing::ProtocolVersion version; const u_int32_t max_frame_size; - ChannelMap channels; + ChannelMap channels; + Connector defaultConnector; Connector* connector; framing::OutputHandler* out; - volatile bool closed; - framing::ProtocolVersion version; + volatile bool isOpen; void erase(framing::ChannelId); void channelException( Channel&, framing::AMQMethodBody*, const QpidError&); Channel channel0; + bool debug; // TODO aconway 2007-01-26: too many friendships, untagle these classes. friend class Channel; @@ -145,10 +145,10 @@ class Connection : public ConnectionForChannel * within a single broker). */ void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", const std::string& pwd = "guest", + const std::string& uid = "guest", + const std::string& pwd = "guest", const std::string& virtualhost = "/"); - /** * Close the connection with optional error information for the peer. * @@ -177,7 +177,10 @@ class Connection : public ConnectionForChannel void idleOut(); void idleIn(); void shutdown(); - + + /**\internal used for testing */ + void setConnector(Connector& connector); + /** * @return the maximum frame size in use on this connection */ @@ -187,8 +190,7 @@ class Connection : public ConnectionForChannel const framing::ProtocolVersion& getVersion() { return version; } }; -} -} +}} // namespace qpid::client #endif diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 40663486f2..1126e861e0 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -37,13 +37,13 @@ namespace qpid { namespace client { -class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable +class Connector : public framing::OutputHandler, + private sys::Runnable { const bool debug; const int receive_buffer_size; const int send_buffer_size; - qpid::framing::ProtocolVersion version; + framing::ProtocolVersion version; bool closed; @@ -53,22 +53,22 @@ class Connector : public qpid::framing::OutputHandler, u_int32_t idleIn; u_int32_t idleOut; - qpid::sys::TimeoutHandler* timeoutHandler; - qpid::sys::ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; + sys::TimeoutHandler* timeoutHandler; + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; + framing::Buffer inbuf; + framing::Buffer outbuf; - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; + sys::Mutex writeLock; + sys::Thread receiver; - qpid::sys::Socket socket; + sys::Socket socket; void checkIdle(ssize_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); + void writeBlock(framing::AMQDataBlock* data); void writeToSocket(char* data, size_t available); void setSocketTimeout(); @@ -77,23 +77,22 @@ class Connector : public qpid::framing::OutputHandler, friend class Channel; public: - Connector(const qpid::framing::ProtocolVersion& pVersion, + Connector(const framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); virtual void init(); virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); - virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void setInputHandler(framing::InputHandler* handler); + virtual void setTimeoutHandler(sys::TimeoutHandler* handler); + virtual void setShutdownHandler(sys::ShutdownHandler* handler); + virtual framing::OutputHandler* getOutputHandler(); + virtual void send(framing::AMQFrame* frame); virtual void setReadTimeout(u_int16_t timeout); virtual void setWriteTimeout(u_int16_t timeout); }; -} -} +}} #endif diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 59dc93c287..1fdb8d6691 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -29,6 +29,7 @@ void ChannelAdapter::init( id = i; out = &o; version = v; + context = MethodContext(id, this); } void ChannelAdapter::send(AMQFrame* frame) { @@ -58,19 +59,21 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) { void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { assertMethodOk(*request); responder.received(request->getData()); - MethodContext context(id, this, request->getRequestId()); + context =MethodContext(id, this, request->getRequestId()); handleMethodInContext(request, context); } void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { assertMethodOk(*response); - handleMethod(response); + // TODO aconway 2007-01-30: Consider a response handled on receipt. + // Review - any cases where this is not the case? requester.processed(response->getData()); + handleMethod(response); } void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { assertMethodOk(*method); - MethodContext context(id, this); + context = MethodContext(id, this); handleMethodInContext(method, context); } diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index 60615740ad..b2a5ef6ff5 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -54,7 +54,7 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter() : id(0), out(0) {} + ChannelAdapter() : context(0), id(0), out(0) {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, const ProtocolVersion&); @@ -92,6 +92,9 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { RequestId getRequestInProgress() { return requestInProgress; } + protected: + MethodContext context; + private: ChannelId id; OutputHandler* out; diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 1aa4be8f1e..46d2e064b5 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -50,7 +50,7 @@ struct MethodContext : channelId(channel), out(output), requestId(request){} /** \internal Channel on which the method is sent. */ - const ChannelId channelId; + ChannelId channelId; /** Output handler for responses in this context */ OutputHandler* out; @@ -58,7 +58,7 @@ struct MethodContext /** \internal If we are in the context of processing an incoming request, * this is the ID. Otherwise it is 0. */ - const RequestId requestId; + RequestId requestId; }; diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 10f787f4fe..52384857ed 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -56,10 +56,11 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bo // Must define Acceptor virtual dtor. Acceptor::~Acceptor() {} - APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : +APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : port(port_), trace(trace_), - processor(APRPool::get(), threads, 1000, 5000000) + processor(APRPool::get(), threads, 1000, 5000000), + running(false) { apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp index 445d13e384..721d8ad857 100644 --- a/cpp/tests/FramingTest.cpp +++ b/cpp/tests/FramingTest.cpp @@ -18,6 +18,9 @@ * under the License. * */ +#include <memory> +#include <boost/lexical_cast.hpp> + #include <ConnectionRedirectBody.h> #include <ProtocolVersion.h> #include <amqp_framing.h> @@ -25,15 +28,20 @@ #include <qpid_test_plugin.h> #include <sstream> #include <typeinfo> +#include <QpidError.h> #include <AMQP_HighestVersion.h> #include "AMQRequestBody.h" #include "AMQResponseBody.h" #include "Requester.h" #include "Responder.h" -#include <QpidError.h> +#include "InProcessBroker.h" +#include "client/Connection.h" +#include "client/ClientExchange.h" +#include "client/ClientQueue.h" +using namespace qpid; using namespace qpid::framing; -using qpid::QpidError; +using namespace std; template <class T> std::string tostring(const T& x) @@ -60,6 +68,7 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_TEST(testInlineContent); CPPUNIT_TEST(testContentReference); CPPUNIT_TEST(testContentValidation); + CPPUNIT_TEST(testRequestResponseRoundtrip); CPPUNIT_TEST_SUITE_END(); private: @@ -324,7 +333,43 @@ class FramingTest : public CppUnit::TestCase // TODO aconway 2007-01-14: Test for batching when supported. } -}; + + // expect may contain null chars so use string(ptr,size) constructor + // Use sizeof(expect)-1 to strip the trailing null. +#define ASSERT_FRAME(expect, frame) \ + CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) + + void testRequestResponseRoundtrip() { + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + client::Channel c; + clientConnection.openChannel(c); + + client::Exchange exchange( + "MyExchange", client::Exchange::TOPIC_EXCHANGE); + client::Queue queue("MyQueue", true); + c.declareExchange(exchange); + c.declareQueue(queue); + c.bind(exchange, queue, "MyTopic", framing::FieldTable()); + broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++); + } + }; // Make this test suite a plugin. diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h new file mode 100644 index 0000000000..4ef352e677 --- /dev/null +++ b/cpp/tests/InProcessBroker.h @@ -0,0 +1,153 @@ +#ifndef _tests_InProcessBroker_h +#define _tests_InProcessBroker_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <vector> +#include <iostream> +#include <algorithm> + +#include "framing/AMQFrame.h" +#include "broker/Broker.h" +#include "broker/Connection.h" +#include "client/Connector.h" + +namespace qpid { +namespace broker { + +/** Make a copy of a frame body. Inefficient, only intended for tests. */ +// TODO aconway 2007-01-29: from should be const, need to fix +// AMQPFrame::encode as const. +framing::AMQFrame copy(framing::AMQFrame& from) { + framing::Buffer buffer(from.size()); + from.encode(buffer); + buffer.flip(); + framing::AMQFrame result; + result.decode(buffer); + return result; +} + +/** + * A broker that implements client::Connector allowing direct + * in-process connection of client to broker. Used to write round-trip + * tests without requiring an external broker process. + * + * Also allows you to "snoop" on frames exchanged between client & broker. + * + * Use as follows: + * + \code + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + ... use as normal + \endcode + * + */ +class InProcessBroker : public client::Connector { + public: + enum Sender {CLIENT,BROKER}; + struct Frame : public framing::AMQFrame { + Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} + bool fromBroker() const { return from == BROKER; } + bool fromClient() const { return from == CLIENT; } + + template <class MethodType> + MethodType* asMethod() { + return dynamic_cast<MethodType*>(getBody().get()); + } + + Sender from; + }; + typedef std::vector<Frame> Conversation; + + InProcessBroker(const framing::ProtocolVersion& ver) : + Connector(ver), + protocolInit(ver), + broker(broker::Broker::create()), + brokerOut(BROKER, conversation), + brokerConnection(&brokerOut, *broker), + clientOut(CLIENT, conversation, &brokerConnection) + {} + + void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(&protocolInit); } + void close() {} + + /** Client's input handler. */ + void setInputHandler(framing::InputHandler* handler) { + brokerOut.in = handler; + } + + /** Called by client to send a frame */ + void send(framing::AMQFrame* frame) { + clientOut.send(frame); + } + + /** Entire client-broker conversation is recorded here */ + Conversation conversation; + + private: + /** OutputHandler that forwards data to an InputHandler */ + struct OutputToInputHandler : public sys::ConnectionOutputHandler { + OutputToInputHandler( + Sender from_, Conversation& conversation_, + framing::InputHandler* ih=0 + ) : from(from_), conversation(conversation_), in(ih) {} + + void send(framing::AMQFrame* frame) { + conversation.push_back(Frame(from, copy(*frame))); + in->received(frame); + } + + void close() {} + + Sender from; + Conversation& conversation; + framing::InputHandler* in; + }; + + framing::ProtocolInitiation protocolInit; + Broker::shared_ptr broker; + OutputToInputHandler brokerOut; + broker::Connection brokerConnection; + OutputToInputHandler clientOut; +}; + +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Frame& frame) +{ + return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") << + static_cast<const framing::AMQFrame&>(frame); +} +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Conversation& conv) +{ + for (InProcessBroker::Conversation::const_iterator i = conv.begin(); + i != conv.end(); ++i) + { + out << *i << std::endl; + } + return out; +} + + +}} // namespace qpid::broker + +#endif /*!_tests_InProcessBroker_h*/ |