diff options
Diffstat (limited to 'cpp/src/tests/InProcessBroker.h')
-rw-r--r-- | cpp/src/tests/InProcessBroker.h | 215 |
1 files changed, 141 insertions, 74 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 2a9f12771b..c5860568db 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -25,6 +25,9 @@ #include "qpid/client/Connector.h" #include "qpid/client/Connection.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/ConcurrentQueue.h" +#include "qpid/shared_ptr.h" #include <vector> #include <iostream> @@ -32,112 +35,176 @@ namespace qpid { -namespace broker { + /** - * A broker that implements client::Connector allowing direct - * in-process connection of client to broker. Used to write round-trip - * tests without requiring an external broker process. - * + * A client::Connector that connects directly to an in-process broker. * Also allows you to "snoop" on frames exchanged between client & broker. * * see FramingTest::testRequestResponseRoundtrip() for example of use. */ -class InProcessBroker : public client::Connector { +class InProcessConnector : + public client::Connector +{ public: + typedef sys::Mutex Mutex; + typedef Mutex::ScopedLock Lock; + typedef framing::FrameHandler FrameHandler; + typedef framing::AMQFrame AMQFrame; + enum Sender {CLIENT,BROKER}; - /** A frame tagged with the sender */ - struct TaggedFrame { - TaggedFrame(Sender e, framing::AMQFrame& f) : frame(f), sender(e) {} - bool fromBroker() const { return sender == BROKER; } - bool fromClient() const { return sender == CLIENT; } + /** Simulate the network thread of a peer with a queue and a thread. + * With setInputHandler(0) drops frames simulating network packet loss. + */ + class NetworkQueue : public sys::Runnable + { + public: + NetworkQueue(const char* r) : inputHandler(0), receiver(r) { + thread=sys::Thread(this); + } - template <class MethodType> - MethodType* asMethod() { - return dynamic_cast<MethodType*>(frame.getBody()); + ~NetworkQueue() { + queue.shutdown(); + thread.join(); } - framing::AMQFrame frame; - Sender sender; + + void push(AMQFrame& f) { queue.push(f); } + + void run() { + AMQFrame f; + while (queue.waitPop(f)) { + Lock l(lock); + if (inputHandler) { + QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f)); + inputHandler->handle(f); + } + else { + QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f)); + } + } + } + + void setInputHandler(FrameHandler* h) { + Lock l(lock); + inputHandler = h; + } + + private: + sys::Mutex lock; + sys::ConcurrentQueue<AMQFrame> queue; + sys::Thread thread; + FrameHandler* inputHandler; + const char* const receiver; }; - - typedef std::vector<TaggedFrame> Conversation; - - InProcessBroker(framing::ProtocolVersion ver= - framing::highestProtocolVersion - ) : - Connector(ver), - protocolInit(ver), - broker(broker::Broker::create()), - brokerOut(BROKER, conversation), + + struct InProcessHandler : public sys::ConnectionOutputHandler { + Sender from; + NetworkQueue queue; + const char* const sender; + + InProcessHandler(Sender s) + : from(s), + queue(from==CLIENT? "BROKER" : "CLIENT"), + sender(from==BROKER? "BROKER" : "CLIENT") + {} + + ~InProcessHandler() { } + + void send(AMQFrame& f) { + QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f)); + queue.push(f); + } + + void close() { + // Do not shut down the queue here, we may be in + // the queue's dispatch thread. + } + }; + + InProcessConnector(shared_ptr<broker::Broker> b, + framing::ProtocolVersion v=framing::ProtocolVersion()) : + Connector(v), + protocolInit(v), + broker(b), + brokerOut(BROKER), brokerConnection(&brokerOut, *broker), - clientOut(CLIENT, conversation, &brokerConnection) - {} + clientOut(CLIENT), + isClosed(false) + { + clientOut.queue.setInputHandler(&brokerConnection); + } - ~InProcessBroker() { broker->shutdown(); } + ~InProcessConnector() { + close(); + + } void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(protocolInit); } - void close() {} + + void close() { + if (!isClosed) { + isClosed = true; + brokerOut.close(); + clientOut.close(); + brokerConnection.closed(); + } + } /** Client's input handler. */ void setInputHandler(framing::InputHandler* handler) { - brokerOut.in = handler; + brokerOut.queue.setInputHandler(handler); } /** Called by client to send a frame */ void send(framing::AMQFrame& frame) { - clientOut.send(frame); + clientOut.handle(frame); } - /** Entire client-broker conversation is recorded here */ - Conversation conversation; + /** Sliently discard frames sent by either party, lost network traffic. */ + void discard() { + brokerOut.queue.setInputHandler(0); + clientOut.queue.setInputHandler(0); + } private: - /** OutputHandler that forwards data to an InputHandler */ - struct OutputToInputHandler : public sys::ConnectionOutputHandler { - OutputToInputHandler( - Sender sender_, Conversation& conversation_, - framing::InputHandler* ih=0 - ) : sender(sender_), conversation(conversation_), in(ih) {} - - void send(framing::AMQFrame& frame) { - QPID_LOG(debug, - (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame); - conversation.push_back(TaggedFrame(sender, frame)); - in->received(frame); - } - - void close() {} - - Sender sender; - Conversation& conversation; - framing::InputHandler* in; - }; - + sys::Mutex lock; framing::ProtocolInitiation protocolInit; - shared_ptr<Broker> broker; - OutputToInputHandler brokerOut; + shared_ptr<broker::Broker> broker; + InProcessHandler brokerOut; broker::Connection brokerConnection; - OutputToInputHandler clientOut; + InProcessHandler clientOut; + bool isClosed; }; -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::TaggedFrame& tf) -{ - return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << tf.frame; -} - -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Conversation& conv) -{ - copy(conv.begin(), conv.end(), - std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n")); - return out; -} - -} // namespace broker -} // namespace qpid +struct InProcessConnection : public client::Connection { + InProcessConnection(shared_ptr<broker::Broker> b) + : client::Connection( + shared_ptr<client::Connector>( + new InProcessConnector(b))) + { + open(""); + } + + ~InProcessConnection() { } + + /** Simulate disconnected network connection. */ + void disconnect() { impl->getConnector()->close(); } + + /** Sliently discard frames sent by either party, lost network traffic. */ + void discard() { + dynamic_pointer_cast<InProcessConnector>( + impl->getConnector())->discard(); + } +}; +/** A connector with its own broker */ +struct InProcessBroker : public InProcessConnector { + InProcessBroker() : InProcessConnector(broker::Broker::create()) {} +}; + +} // namespace qpid #endif // _tests_InProcessBroker_h |