summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/InProcessBroker.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/InProcessBroker.h')
-rw-r--r--qpid/cpp/src/tests/InProcessBroker.h29
1 files changed, 16 insertions, 13 deletions
diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h
index c5860568db..3f6ff0936e 100644
--- a/qpid/cpp/src/tests/InProcessBroker.h
+++ b/qpid/cpp/src/tests/InProcessBroker.h
@@ -26,7 +26,7 @@
#include "qpid/client/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/BlockingQueue.h"
#include "qpid/shared_ptr.h"
#include <vector>
@@ -65,26 +65,29 @@ class InProcessConnector :
}
~NetworkQueue() {
- queue.shutdown();
+ queue.close();
thread.join();
}
void push(AMQFrame& f) { queue.push(f); }
void run() {
- AMQFrame f;
- while (queue.waitPop(f)) {
- Lock l(lock);
- if (inputHandler) {
- QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
- inputHandler->handle(f);
- }
- else {
- QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
+ try {
+ while(true) {
+ AMQFrame f = queue.pop();
+ if (inputHandler) {
+ QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
+ inputHandler->handle(f);
+ }
+ else
+ QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
}
}
+ catch (const sys::QueueClosed&) {
+ return;
+ }
}
-
+
void setInputHandler(FrameHandler* h) {
Lock l(lock);
inputHandler = h;
@@ -92,7 +95,7 @@ class InProcessConnector :
private:
sys::Mutex lock;
- sys::ConcurrentQueue<AMQFrame> queue;
+ sys::BlockingQueue<AMQFrame> queue;
sys::Thread thread;
FrameHandler* inputHandler;
const char* const receiver;