summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ResponseHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/client/ResponseHandler.cpp')
-rw-r--r--cpp/lib/client/ResponseHandler.cpp57
1 files changed, 25 insertions, 32 deletions
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index 4498de41ae..926a9ce336 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -18,12 +18,10 @@
* under the License.
*
*/
-#include <boost/format.hpp>
-
-#include <ResponseHandler.h>
-#include <sys/Monitor.h>
#include <QpidError.h>
-#include "amqp_types.h"
+#include <boost/format.hpp>
+#include "ResponseHandler.h"
+#include "AMQMethodBody.h"
using namespace qpid::sys;
using namespace qpid::framing;
@@ -31,56 +29,51 @@ using namespace qpid::framing;
namespace qpid {
namespace client {
-ResponseHandler::ResponseHandler() : waiting(false){}
+ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {}
ResponseHandler::~ResponseHandler(){}
-bool ResponseHandler::validate(ClassId c, MethodId m) {
- return response != 0 &&
- response->amqpClassId() ==c && response->amqpMethodId() == m;
+bool ResponseHandler::isWaiting() {
+ Monitor::ScopedLock l(monitor);
+ return waiting;
}
-void ResponseHandler::waitForResponse(){
+void ResponseHandler::expect(){
Monitor::ScopedLock l(monitor);
- while (waiting)
- monitor.wait();
+ waiting = true;
}
-void ResponseHandler::signalResponse(
- qpid::framing::AMQMethodBody::shared_ptr _response)
+void ResponseHandler::signalResponse(MethodPtr _response)
{
Monitor::ScopedLock l(monitor);
response = _response;
+ if (!response)
+ shutdownFlag=true;
waiting = false;
monitor.notify();
}
-void ResponseHandler::receive(ClassId c, MethodId m) {
+ResponseHandler::MethodPtr ResponseHandler::receive() {
Monitor::ScopedLock l(monitor);
- while (waiting)
+ while (!response && !shutdownFlag)
monitor.wait();
- getResponse(); // Check for closed.
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ if (shutdownFlag)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ MethodPtr result = response;
+ response.reset();
+ return result;
+}
+
+ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) {
+ MethodPtr response = receive();
+ if(c != response->amqpClassId() || m != response->amqpMethodId()) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
% c % m % response->amqpClassId() % response->amqpMethodId());
}
-}
-
-framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() {
- if (!response)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, "Channel closed unexpectedly.");
return response;
}
-RequestId ResponseHandler::getRequestId() {
- assert(response->getRequestId());
- return response->getRequestId();
-}
-void ResponseHandler::expect(){
- waiting = true;
-}
-
}} // namespace qpid::client