diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/InProcessBroker.h | 54 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 34 |
2 files changed, 59 insertions, 29 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index c893e6906a..9fa0135502 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -36,6 +36,7 @@ namespace qpid { +using qpid::sys::ConnectionInputHandler; /** * A client::Connector that connects directly to an in-process broker. @@ -54,13 +55,21 @@ class InProcessConnector : enum Sender {CLIENT,BROKER}; + struct Task { + AMQFrame frame; + bool doOutput; + + Task() : doOutput(true) {} + Task(AMQFrame& f) : frame(f), doOutput(false) {} + }; + /** Simulate the network thread of a peer with a queue and a thread. * With setInputHandler(0) drops frames simulating network packet loss. */ class NetworkQueue : public sys::Runnable { public: - NetworkQueue(const char* r) : inputHandler(0), receiver(r) { + NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) { thread=sys::Thread(this); } @@ -70,17 +79,24 @@ class InProcessConnector : } void push(AMQFrame& f) { queue.push(f); } + void activateOutput() { queue.push(Task()); } void run() { try { while(true) { - AMQFrame f = queue.pop(); - if (inputHandler) { - QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f)); - inputHandler->handle(f); + Task t = queue.pop(); + if (t.doOutput) { + if (connectionHandler) { + while (connectionHandler->doOutput()); + } + } else { + if (inputHandler) { + QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame)); + inputHandler->handle(t.frame); + } + else + QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame)); } - else - QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f)); } } catch (const ClosedException&) { @@ -88,16 +104,24 @@ class InProcessConnector : } } + void setConnectionInputHandler(ConnectionInputHandler* h) { + Lock l(lock); + inputHandler = h; + connectionHandler = h; + } + void setInputHandler(FrameHandler* h) { Lock l(lock); inputHandler = h; + connectionHandler = 0; } private: sys::Mutex lock; - sys::BlockingQueue<AMQFrame> queue; + sys::BlockingQueue<Task> queue; sys::Thread thread; FrameHandler* inputHandler; + ConnectionInputHandler* connectionHandler; const char* const receiver; }; @@ -105,11 +129,13 @@ class InProcessConnector : Sender from; NetworkQueue queue; const char* const sender; + NetworkQueue* reverseQueue; InProcessHandler(Sender s) : from(s), queue(from==CLIENT? "BROKER" : "CLIENT"), - sender(from==BROKER? "BROKER" : "CLIENT") + sender(from==BROKER? "BROKER" : "CLIENT"), + reverseQueue(0) {} ~InProcessHandler() { } @@ -123,6 +149,10 @@ class InProcessConnector : // Do not shut down the queue here, we may be in // the queue's dispatch thread. } + + void activateOutput() { + if (reverseQueue) reverseQueue->activateOutput(); + } }; InProcessConnector(shared_ptr<broker::Broker> b, @@ -135,7 +165,9 @@ class InProcessConnector : clientOut(CLIENT), isClosed(false) { - clientOut.queue.setInputHandler(&brokerConnection); + clientOut.queue.setConnectionInputHandler(&brokerConnection); + brokerOut.reverseQueue = &clientOut.queue; + clientOut.reverseQueue = &brokerOut.queue; } ~InProcessConnector() { @@ -169,7 +201,7 @@ class InProcessConnector : /** Sliently discard frames sent by either party, lost network traffic. */ void discard() { brokerOut.queue.setInputHandler(0); - clientOut.queue.setInputHandler(0); + clientOut.queue.setConnectionInputHandler(0); } private: diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 4714a998f6..7e757cfad0 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -47,6 +47,7 @@ public: received = true; return true; }; + void notify() {} }; class FailOnDeliver : public Deliverable @@ -88,7 +89,7 @@ class QueueTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr<Message> received; - TestConsumer::shared_ptr c1(new TestConsumer()); + TestConsumer c1; queue->consume(c1); @@ -98,7 +99,7 @@ class QueueTest : public CppUnit::TestCase queue->process(msg1); sleep(2); - CPPUNIT_ASSERT(!c1->received); + CPPUNIT_ASSERT(!c1.received); msg1->enqueueComplete(); received = queue->dequeue().payload; @@ -127,8 +128,8 @@ class QueueTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: - TestConsumer::shared_ptr c1(new TestConsumer()); - TestConsumer::shared_ptr c2(new TestConsumer()); + TestConsumer c1; + TestConsumer c2; queue->consume(c1); queue->consume(c2); @@ -140,20 +141,17 @@ class QueueTest : public CppUnit::TestCase intrusive_ptr<Message> msg3 = message("e", "C"); queue->deliver(msg1); - if (!c1->received) - sleep(2); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get()); + CPPUNIT_ASSERT(queue->dispatch(c1)); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); queue->deliver(msg2); - if (!c2->received) - sleep(2); - CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get()); + CPPUNIT_ASSERT(queue->dispatch(c2)); + CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); - c1->received = false; + c1.received = false; queue->deliver(msg3); - if (!c1->received) - sleep(2); - CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get()); + CPPUNIT_ASSERT(queue->dispatch(c1)); + CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); //Test cancellation: queue->cancel(c1); @@ -203,13 +201,13 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount()); - TestConsumer::shared_ptr consumer(new TestConsumer()); + TestConsumer consumer; queue->consume(consumer); - queue->requestDispatch(); - if (!consumer->received) + queue->dispatch(consumer); + if (!consumer.received) sleep(2); - CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get()); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->dequeue().payload; |