summaryrefslogtreecommitdiff
path: root/cpp/src/framing/ChannelAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/framing/ChannelAdapter.cpp')
-rw-r--r--cpp/src/framing/ChannelAdapter.cpp22
1 files changed, 16 insertions, 6 deletions
diff --git a/cpp/src/framing/ChannelAdapter.cpp b/cpp/src/framing/ChannelAdapter.cpp
index 99a14f08fb..d16934a857 100644
--- a/cpp/src/framing/ChannelAdapter.cpp
+++ b/cpp/src/framing/ChannelAdapter.cpp
@@ -35,15 +35,19 @@ void ChannelAdapter::init(
version = v;
}
-RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
- RequestId result = 0;
+RequestId ChannelAdapter::send(
+ shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ RequestId requestId = 0;
assertChannelOpen();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
boost::shared_polymorphic_downcast<AMQRequestBody>(body);
requester.sending(request->getData());
- result = request->getData().requestId;
+ requestId = request->getData().requestId;
+ if (!action.empty())
+ correlator.request(requestId, action);
break;
}
case RESPONSE_BODY: {
@@ -52,9 +56,10 @@ RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
responder.sending(response->getData());
break;
}
+ // No action required for other body types.
}
out->send(new AMQFrame(getVersion(), getId(), body));
- return result;
+ return requestId;
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
@@ -66,10 +71,15 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
assertMethodOk(*response);
- // TODO aconway 2007-01-30: Consider a response handled on receipt.
- // Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
+
+ // FIXME aconway 2007-04-05: processed should be last
+ // but causes problems with InProcessBroker tests because
+ // we execute client code in handleMethod.
+ // Need to introduce a queue & 2 threads for inprocess.
requester.processed(responseData);
+ // FIXME aconway 2007-04-04: exception handling.
+ correlator.response(response);
handleMethod(response);
}