summaryrefslogtreecommitdiff
path: root/cpp/src/tests/InProcessBroker.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/InProcessBroker.h')
-rw-r--r--cpp/src/tests/InProcessBroker.h215
1 files changed, 141 insertions, 74 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 2a9f12771b..c5860568db 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -25,6 +25,9 @@
#include "qpid/client/Connector.h"
#include "qpid/client/Connection.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/shared_ptr.h"
#include <vector>
#include <iostream>
@@ -32,112 +35,176 @@
namespace qpid {
-namespace broker {
+
/**
- * A broker that implements client::Connector allowing direct
- * in-process connection of client to broker. Used to write round-trip
- * tests without requiring an external broker process.
- *
+ * A client::Connector that connects directly to an in-process broker.
* Also allows you to "snoop" on frames exchanged between client & broker.
*
* see FramingTest::testRequestResponseRoundtrip() for example of use.
*/
-class InProcessBroker : public client::Connector {
+class InProcessConnector :
+ public client::Connector
+{
public:
+ typedef sys::Mutex Mutex;
+ typedef Mutex::ScopedLock Lock;
+ typedef framing::FrameHandler FrameHandler;
+ typedef framing::AMQFrame AMQFrame;
+
enum Sender {CLIENT,BROKER};
- /** A frame tagged with the sender */
- struct TaggedFrame {
- TaggedFrame(Sender e, framing::AMQFrame& f) : frame(f), sender(e) {}
- bool fromBroker() const { return sender == BROKER; }
- bool fromClient() const { return sender == CLIENT; }
+ /** Simulate the network thread of a peer with a queue and a thread.
+ * With setInputHandler(0) drops frames simulating network packet loss.
+ */
+ class NetworkQueue : public sys::Runnable
+ {
+ public:
+ NetworkQueue(const char* r) : inputHandler(0), receiver(r) {
+ thread=sys::Thread(this);
+ }
- template <class MethodType>
- MethodType* asMethod() {
- return dynamic_cast<MethodType*>(frame.getBody());
+ ~NetworkQueue() {
+ queue.shutdown();
+ thread.join();
}
- framing::AMQFrame frame;
- Sender sender;
+
+ void push(AMQFrame& f) { queue.push(f); }
+
+ void run() {
+ AMQFrame f;
+ while (queue.waitPop(f)) {
+ Lock l(lock);
+ if (inputHandler) {
+ QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
+ inputHandler->handle(f);
+ }
+ else {
+ QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
+ }
+ }
+ }
+
+ void setInputHandler(FrameHandler* h) {
+ Lock l(lock);
+ inputHandler = h;
+ }
+
+ private:
+ sys::Mutex lock;
+ sys::ConcurrentQueue<AMQFrame> queue;
+ sys::Thread thread;
+ FrameHandler* inputHandler;
+ const char* const receiver;
};
-
- typedef std::vector<TaggedFrame> Conversation;
-
- InProcessBroker(framing::ProtocolVersion ver=
- framing::highestProtocolVersion
- ) :
- Connector(ver),
- protocolInit(ver),
- broker(broker::Broker::create()),
- brokerOut(BROKER, conversation),
+
+ struct InProcessHandler : public sys::ConnectionOutputHandler {
+ Sender from;
+ NetworkQueue queue;
+ const char* const sender;
+
+ InProcessHandler(Sender s)
+ : from(s),
+ queue(from==CLIENT? "BROKER" : "CLIENT"),
+ sender(from==BROKER? "BROKER" : "CLIENT")
+ {}
+
+ ~InProcessHandler() { }
+
+ void send(AMQFrame& f) {
+ QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f));
+ queue.push(f);
+ }
+
+ void close() {
+ // Do not shut down the queue here, we may be in
+ // the queue's dispatch thread.
+ }
+ };
+
+ InProcessConnector(shared_ptr<broker::Broker> b,
+ framing::ProtocolVersion v=framing::ProtocolVersion()) :
+ Connector(v),
+ protocolInit(v),
+ broker(b),
+ brokerOut(BROKER),
brokerConnection(&brokerOut, *broker),
- clientOut(CLIENT, conversation, &brokerConnection)
- {}
+ clientOut(CLIENT),
+ isClosed(false)
+ {
+ clientOut.queue.setInputHandler(&brokerConnection);
+ }
- ~InProcessBroker() { broker->shutdown(); }
+ ~InProcessConnector() {
+ close();
+
+ }
void connect(const std::string& /*host*/, int /*port*/) {}
+
void init() { brokerConnection.initiated(protocolInit); }
- void close() {}
+
+ void close() {
+ if (!isClosed) {
+ isClosed = true;
+ brokerOut.close();
+ clientOut.close();
+ brokerConnection.closed();
+ }
+ }
/** Client's input handler. */
void setInputHandler(framing::InputHandler* handler) {
- brokerOut.in = handler;
+ brokerOut.queue.setInputHandler(handler);
}
/** Called by client to send a frame */
void send(framing::AMQFrame& frame) {
- clientOut.send(frame);
+ clientOut.handle(frame);
}
- /** Entire client-broker conversation is recorded here */
- Conversation conversation;
+ /** Sliently discard frames sent by either party, lost network traffic. */
+ void discard() {
+ brokerOut.queue.setInputHandler(0);
+ clientOut.queue.setInputHandler(0);
+ }
private:
- /** OutputHandler that forwards data to an InputHandler */
- struct OutputToInputHandler : public sys::ConnectionOutputHandler {
- OutputToInputHandler(
- Sender sender_, Conversation& conversation_,
- framing::InputHandler* ih=0
- ) : sender(sender_), conversation(conversation_), in(ih) {}
-
- void send(framing::AMQFrame& frame) {
- QPID_LOG(debug,
- (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame);
- conversation.push_back(TaggedFrame(sender, frame));
- in->received(frame);
- }
-
- void close() {}
-
- Sender sender;
- Conversation& conversation;
- framing::InputHandler* in;
- };
-
+ sys::Mutex lock;
framing::ProtocolInitiation protocolInit;
- shared_ptr<Broker> broker;
- OutputToInputHandler brokerOut;
+ shared_ptr<broker::Broker> broker;
+ InProcessHandler brokerOut;
broker::Connection brokerConnection;
- OutputToInputHandler clientOut;
+ InProcessHandler clientOut;
+ bool isClosed;
};
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::TaggedFrame& tf)
-{
- return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << tf.frame;
-}
-
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::Conversation& conv)
-{
- copy(conv.begin(), conv.end(),
- std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n"));
- return out;
-}
-
-} // namespace broker
-} // namespace qpid
+struct InProcessConnection : public client::Connection {
+ InProcessConnection(shared_ptr<broker::Broker> b)
+ : client::Connection(
+ shared_ptr<client::Connector>(
+ new InProcessConnector(b)))
+ {
+ open("");
+ }
+
+ ~InProcessConnection() { }
+
+ /** Simulate disconnected network connection. */
+ void disconnect() { impl->getConnector()->close(); }
+
+ /** Sliently discard frames sent by either party, lost network traffic. */
+ void discard() {
+ dynamic_pointer_cast<InProcessConnector>(
+ impl->getConnector())->discard();
+ }
+};
+/** A connector with its own broker */
+struct InProcessBroker : public InProcessConnector {
+ InProcessBroker() : InProcessConnector(broker::Broker::create()) {}
+};
+
+} // namespace qpid
#endif // _tests_InProcessBroker_h