summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/InProcessBroker.h54
-rw-r--r--cpp/src/tests/QueueTest.cpp34
2 files changed, 59 insertions, 29 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index c893e6906a..9fa0135502 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -36,6 +36,7 @@
namespace qpid {
+using qpid::sys::ConnectionInputHandler;
/**
* A client::Connector that connects directly to an in-process broker.
@@ -54,13 +55,21 @@ class InProcessConnector :
enum Sender {CLIENT,BROKER};
+ struct Task {
+ AMQFrame frame;
+ bool doOutput;
+
+ Task() : doOutput(true) {}
+ Task(AMQFrame& f) : frame(f), doOutput(false) {}
+ };
+
/** Simulate the network thread of a peer with a queue and a thread.
* With setInputHandler(0) drops frames simulating network packet loss.
*/
class NetworkQueue : public sys::Runnable
{
public:
- NetworkQueue(const char* r) : inputHandler(0), receiver(r) {
+ NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) {
thread=sys::Thread(this);
}
@@ -70,17 +79,24 @@ class InProcessConnector :
}
void push(AMQFrame& f) { queue.push(f); }
+ void activateOutput() { queue.push(Task()); }
void run() {
try {
while(true) {
- AMQFrame f = queue.pop();
- if (inputHandler) {
- QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
- inputHandler->handle(f);
+ Task t = queue.pop();
+ if (t.doOutput) {
+ if (connectionHandler) {
+ while (connectionHandler->doOutput());
+ }
+ } else {
+ if (inputHandler) {
+ QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame));
+ inputHandler->handle(t.frame);
+ }
+ else
+ QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame));
}
- else
- QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
}
}
catch (const ClosedException&) {
@@ -88,16 +104,24 @@ class InProcessConnector :
}
}
+ void setConnectionInputHandler(ConnectionInputHandler* h) {
+ Lock l(lock);
+ inputHandler = h;
+ connectionHandler = h;
+ }
+
void setInputHandler(FrameHandler* h) {
Lock l(lock);
inputHandler = h;
+ connectionHandler = 0;
}
private:
sys::Mutex lock;
- sys::BlockingQueue<AMQFrame> queue;
+ sys::BlockingQueue<Task> queue;
sys::Thread thread;
FrameHandler* inputHandler;
+ ConnectionInputHandler* connectionHandler;
const char* const receiver;
};
@@ -105,11 +129,13 @@ class InProcessConnector :
Sender from;
NetworkQueue queue;
const char* const sender;
+ NetworkQueue* reverseQueue;
InProcessHandler(Sender s)
: from(s),
queue(from==CLIENT? "BROKER" : "CLIENT"),
- sender(from==BROKER? "BROKER" : "CLIENT")
+ sender(from==BROKER? "BROKER" : "CLIENT"),
+ reverseQueue(0)
{}
~InProcessHandler() { }
@@ -123,6 +149,10 @@ class InProcessConnector :
// Do not shut down the queue here, we may be in
// the queue's dispatch thread.
}
+
+ void activateOutput() {
+ if (reverseQueue) reverseQueue->activateOutput();
+ }
};
InProcessConnector(shared_ptr<broker::Broker> b,
@@ -135,7 +165,9 @@ class InProcessConnector :
clientOut(CLIENT),
isClosed(false)
{
- clientOut.queue.setInputHandler(&brokerConnection);
+ clientOut.queue.setConnectionInputHandler(&brokerConnection);
+ brokerOut.reverseQueue = &clientOut.queue;
+ clientOut.reverseQueue = &brokerOut.queue;
}
~InProcessConnector() {
@@ -169,7 +201,7 @@ class InProcessConnector :
/** Sliently discard frames sent by either party, lost network traffic. */
void discard() {
brokerOut.queue.setInputHandler(0);
- clientOut.queue.setInputHandler(0);
+ clientOut.queue.setConnectionInputHandler(0);
}
private:
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 4714a998f6..7e757cfad0 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -47,6 +47,7 @@ public:
received = true;
return true;
};
+ void notify() {}
};
class FailOnDeliver : public Deliverable
@@ -88,7 +89,7 @@ class QueueTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> received;
- TestConsumer::shared_ptr c1(new TestConsumer());
+ TestConsumer c1;
queue->consume(c1);
@@ -98,7 +99,7 @@ class QueueTest : public CppUnit::TestCase
queue->process(msg1);
sleep(2);
- CPPUNIT_ASSERT(!c1->received);
+ CPPUNIT_ASSERT(!c1.received);
msg1->enqueueComplete();
received = queue->dequeue().payload;
@@ -127,8 +128,8 @@ class QueueTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_queue", true));
//Test adding consumers:
- TestConsumer::shared_ptr c1(new TestConsumer());
- TestConsumer::shared_ptr c2(new TestConsumer());
+ TestConsumer c1;
+ TestConsumer c2;
queue->consume(c1);
queue->consume(c2);
@@ -140,20 +141,17 @@ class QueueTest : public CppUnit::TestCase
intrusive_ptr<Message> msg3 = message("e", "C");
queue->deliver(msg1);
- if (!c1->received)
- sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get());
+ CPPUNIT_ASSERT(queue->dispatch(c1));
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
queue->deliver(msg2);
- if (!c2->received)
- sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get());
+ CPPUNIT_ASSERT(queue->dispatch(c2));
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
- c1->received = false;
+ c1.received = false;
queue->deliver(msg3);
- if (!c1->received)
- sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get());
+ CPPUNIT_ASSERT(queue->dispatch(c1));
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
//Test cancellation:
queue->cancel(c1);
@@ -203,13 +201,13 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
- TestConsumer::shared_ptr consumer(new TestConsumer());
+ TestConsumer consumer;
queue->consume(consumer);
- queue->requestDispatch();
- if (!consumer->received)
+ queue->dispatch(consumer);
+ if (!consumer.received)
sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get());
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->dequeue().payload;