summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-14 14:29:07 +0000
committerAlan Conway <aconway@apache.org>2007-08-14 14:29:07 +0000
commit284aec62741c7dc069c8e7199a5ce3bf61c277ff (patch)
tree774d7dfd17a7a0ca562f1a0788b028984d6f0b3a
parent9d660da0bad7c37cb55d4037711265fd0b3b7e9d (diff)
downloadqpid-python-284aec62741c7dc069c8e7199a5ce3bf61c277ff.tar.gz
Deleted following files that are obsolete for 0-10:
src/qpid/framing/AMQRequestBody.cpp src/qpid/framing/AMQRequestBody.h src/qpid/framing/AMQResponseBody.cpp src/qpid/framing/AMQResponseBody.h src/qpid/framing/Correlator.cpp src/qpid/framing/Correlator.h src/qpid/framing/MethodContext.cpp src/qpid/framing/Requester.cpp src/qpid/framing/Requester.h src/qpid/framing/Responder.cpp src/qpid/framing/Responder.h Made changes to support their deletion. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@565770 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am11
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/SessionManager.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.cpp9
-rw-r--r--qpid/cpp/src/qpid/framing/AMQRequestBody.cpp66
-rw-r--r--qpid/cpp/src/qpid/framing/AMQRequestBody.h78
-rw-r--r--qpid/cpp/src/qpid/framing/AMQResponseBody.cpp65
-rw-r--r--qpid/cpp/src/qpid/framing/AMQResponseBody.h85
-rw-r--r--qpid/cpp/src/qpid/framing/BodyHandler.cpp8
-rw-r--r--qpid/cpp/src/qpid/framing/BodyHandler.h9
-rw-r--r--qpid/cpp/src/qpid/framing/ChannelAdapter.cpp42
-rw-r--r--qpid/cpp/src/qpid/framing/ChannelAdapter.h19
-rw-r--r--qpid/cpp/src/qpid/framing/Correlator.cpp43
-rw-r--r--qpid/cpp/src/qpid/framing/Correlator.h68
-rw-r--r--qpid/cpp/src/qpid/framing/Frame.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/FramingContent.h2
-rw-r--r--qpid/cpp/src/qpid/framing/MethodContext.cpp34
-rw-r--r--qpid/cpp/src/qpid/framing/MethodContext.h6
-rw-r--r--qpid/cpp/src/qpid/framing/Requester.cpp40
-rw-r--r--qpid/cpp/src/qpid/framing/Requester.h68
-rw-r--r--qpid/cpp/src/qpid/framing/Responder.cpp43
-rw-r--r--qpid/cpp/src/qpid/framing/Responder.h61
-rw-r--r--qpid/cpp/src/tests/FramingTest.cpp170
26 files changed, 14 insertions, 932 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 118a60ec70..690f939e71 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -135,15 +135,12 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(platform_src) \
qpid/framing/AMQBody.cpp \
- qpid/framing/AMQRequestBody.cpp \
- qpid/framing/AMQResponseBody.cpp \
qpid/framing/AMQContentBody.cpp \
qpid/framing/AMQFrame.cpp \
qpid/framing/AMQHeaderBody.cpp \
qpid/framing/AMQHeartbeatBody.cpp \
qpid/framing/AMQMethodBody.cpp \
qpid/framing/FrameHandler.h \
- qpid/framing/MethodContext.cpp \
qpid/framing/BasicHeaderProperties.cpp \
qpid/framing/BodyHandler.cpp \
qpid/framing/ChannelAdapter.cpp \
@@ -154,11 +151,8 @@ libqpidcommon_la_SOURCES = \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
qpid/framing/ProtocolVersionException.cpp \
- qpid/framing/Requester.cpp \
- qpid/framing/Responder.cpp \
qpid/framing/SequenceNumber.cpp \
qpid/framing/SequenceNumberSet.cpp \
- qpid/framing/Correlator.cpp \
qpid/framing/Value.cpp \
qpid/framing/Proxy.cpp \
qpid/framing/Uuid.cpp \
@@ -366,13 +360,10 @@ nobase_include_HEADERS = \
qpid/framing/AMQHeaderBody.h \
qpid/framing/AMQHeartbeatBody.h \
qpid/framing/AMQMethodBody.h \
- qpid/framing/AMQRequestBody.h \
- qpid/framing/AMQResponseBody.h \
qpid/framing/BasicHeaderProperties.h \
qpid/framing/BodyHandler.h \
qpid/framing/Buffer.h \
qpid/framing/ChannelAdapter.h \
- qpid/framing/Correlator.h \
qpid/framing/FieldTable.h \
qpid/framing/FramingContent.h \
qpid/framing/HeaderProperties.h \
@@ -385,8 +376,6 @@ nobase_include_HEADERS = \
qpid/framing/ProtocolVersion.h \
qpid/framing/ProtocolVersionException.h \
qpid/framing/Proxy.h \
- qpid/framing/Requester.h \
- qpid/framing/Responder.h \
qpid/framing/SerializeHandler.h \
qpid/framing/SequenceNumber.h \
qpid/framing/SequenceNumberSet.h \
diff --git a/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp b/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp
index 65933660f1..4fa4b2c238 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -50,14 +50,10 @@ void ConnectionAdapter::handleMethodInContext(
)
{
try{
- handler->client.setResponseTo(context.getRequestId());
method->invoke(*this, context);
- handler->client.setResponseTo(0);
}catch(ConnectionException& e){
- handler->client.setResponseTo(0);
handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
- handler->client.setResponseTo(0);
handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index c728a800ab..70f7c3b8ec 100644
--- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -162,10 +162,9 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
- RequestId requestId = context.getRequestId();
if (transfer->getBody().isInline()) {
- MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
+ MessageMessage::shared_ptr message(new MessageMessage(&connection, 0, transfer));
channel.handleInlineTransfer(message);
} else {
throw ConnectionException(540, "References no longer supported");
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
index 08f91bf69a..fd0a5cfbe1 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -171,16 +171,16 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_
msg->deliver(*this, tag, token, connection.getFrameMax());
}
-RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action)
+RequestId SemanticHandler::send(shared_ptr<AMQBody> body)
{
Mutex::ScopedLock l(outLock);
uint8_t type(body->type());
- if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
+ if (type == METHOD_BODY) {
//temporary hack until channel management is moved to its own handler:
if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
++outgoing.hwm;
//std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
}
}
- return ChannelAdapter::send(body, action);
+ return ChannelAdapter::send(body);
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h
index b863b3486e..7d5d95243e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.h
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h
@@ -59,7 +59,7 @@ class SemanticHandler : private framing::ChannelAdapter,
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
- framing::RequestId send(shared_ptr<framing::AMQBody> body, framing::Correlator::Action action=framing::Correlator::Action());
+ framing::RequestId send(shared_ptr<framing::AMQBody> body);
//delivery adapter methods:
diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.cpp b/qpid/cpp/src/qpid/cluster/SessionManager.cpp
index c9e79b4bbc..715ed7817c 100644
--- a/qpid/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/cluster/SessionManager.cpp
@@ -67,7 +67,7 @@ using namespace broker;
virtual bool isOpen() const{ return true; }
virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
// No-op send.
- virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+ virtual RequestId send(shared_ptr<AMQBody>) { return 0; }
//delivery adapter methods, also no-ops:
virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; }
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
index 778c9ab505..f79eae3524 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
@@ -23,8 +23,7 @@
#include "AMQFrame.h"
#include "qpid/QpidError.h"
-#include "AMQRequestBody.h"
-#include "AMQResponseBody.h"
+#include "AMQMethodBody.h"
namespace qpid {
@@ -91,12 +90,6 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
case METHOD_BODY:
body = AMQMethodBody::create(versionMap, version, buffer);
break;
- case REQUEST_BODY:
- body = AMQRequestBody::create(versionMap, version, buffer);
- break;
- case RESPONSE_BODY:
- body = AMQResponseBody::create(versionMap, version, buffer);
- break;
case HEADER_BODY:
body = AMQBody::shared_ptr(new AMQHeaderBody());
break;
diff --git a/qpid/cpp/src/qpid/framing/AMQRequestBody.cpp b/qpid/cpp/src/qpid/framing/AMQRequestBody.cpp
deleted file mode 100644
index 9e7b307a47..0000000000
--- a/qpid/cpp/src/qpid/framing/AMQRequestBody.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "AMQRequestBody.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
-
-namespace qpid {
-namespace framing {
-
-void AMQRequestBody::Data::encode(Buffer& buffer) const {
- buffer.putLongLong(requestId);
- buffer.putLongLong(responseMark);
- buffer.putLong(0); // Reserved long in spec.
-}
-
-void AMQRequestBody::Data::decode(Buffer& buffer) {
- requestId = buffer.getLongLong();
- responseMark = buffer.getLongLong();
- buffer.getLong(); // Ignore reserved long.
-}
-
-void AMQRequestBody::encode(Buffer& buffer) const {
- data.encode(buffer);
- encodeId(buffer);
- encodeContent(buffer);
-}
-
-AMQRequestBody::shared_ptr
-AMQRequestBody::create(
- AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
- Buffer& buffer)
-{
- ClassMethodId id;
- Data data;
- data.decode(buffer);
- id.decode(buffer);
- AMQRequestBody* body = dynamic_cast<AMQRequestBody*>(
- versionMap.createMethodBody(
- id.classId, id.methodId, version.getMajor(), version.getMinor()));
- assert(body);
- body->data = data;
- return AMQRequestBody::shared_ptr(body);
-}
-
-void AMQRequestBody::printPrefix(std::ostream& out) const {
- out << "request(id=" << data.requestId << ",mark="
- << data.responseMark << "): ";
-}
-
-}} // namespace qpid::framing
-
diff --git a/qpid/cpp/src/qpid/framing/AMQRequestBody.h b/qpid/cpp/src/qpid/framing/AMQRequestBody.h
deleted file mode 100644
index 691a4c8768..0000000000
--- a/qpid/cpp/src/qpid/framing/AMQRequestBody.h
+++ /dev/null
@@ -1,78 +0,0 @@
-#ifndef _framing_AMQRequestBody_h
-#define _framing_AMQRequestBody_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/AMQMethodBody.h"
-
-namespace qpid {
-namespace framing {
-
-/**
- * Body of a request method frame.
- */
-class AMQRequestBody : public AMQMethodBody
-{
- public:
- typedef boost::shared_ptr<AMQRequestBody> shared_ptr;
-
- struct Data {
- Data(RequestId id=0, ResponseId mark=0)
- : requestId(id), responseMark(mark) {}
- void encode(Buffer&) const;
- void decode(Buffer&);
-
- RequestId requestId;
- ResponseId responseMark;
- };
-
- static Data& getData(const AMQBody::shared_ptr& body) {
- return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData();
- }
-
- static shared_ptr create(
- AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
- Buffer& buffer);
-
- AMQRequestBody(ProtocolVersion v, RequestId id=0, ResponseId mark=0)
- : AMQMethodBody(v), data(id, mark) {}
-
- uint8_t type() const { return REQUEST_BODY; }
- void encode(Buffer& buffer) const;
-
- Data& getData() { return data; }
- RequestId getRequestId() const { return data.requestId; }
- ResponseId getResponseMark() const { return data.responseMark; }
- void setRequestId(RequestId id) { data.requestId=id; }
- void setResponseMark(ResponseId mark) { data.responseMark=mark; }
-
- bool isRequest()const { return true; }
- static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; }
- protected:
- void printPrefix(std::ostream& out) const;
-
- private:
- Data data;
-};
-
-}} // namespace qpid::framing
-
-
-
-#endif /*!_framing_AMQRequestBody_h*/
diff --git a/qpid/cpp/src/qpid/framing/AMQResponseBody.cpp b/qpid/cpp/src/qpid/framing/AMQResponseBody.cpp
deleted file mode 100644
index 4354a9c564..0000000000
--- a/qpid/cpp/src/qpid/framing/AMQResponseBody.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "AMQFrame.h"
-#include "AMQResponseBody.h"
-#include "qpid/framing/AMQP_MethodVersionMap.h"
-
-namespace qpid {
-namespace framing {
-
-void AMQResponseBody::Data::encode(Buffer& buffer) const {
- buffer.putLongLong(responseId);
- buffer.putLongLong(requestId);
- buffer.putLong(batchOffset);
-}
-
-void AMQResponseBody::Data::decode(Buffer& buffer) {
- responseId = buffer.getLongLong();
- requestId = buffer.getLongLong();
- batchOffset = buffer.getLong();
-}
-
-void AMQResponseBody::encode(Buffer& buffer) const {
- data.encode(buffer);
- encodeId(buffer);
- encodeContent(buffer);
-}
-
-AMQResponseBody::shared_ptr AMQResponseBody::create(
- AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
- Buffer& buffer)
-{
- ClassMethodId id;
- Data data;
- data.decode(buffer);
- id.decode(buffer);
- AMQResponseBody* body = dynamic_cast<AMQResponseBody*>(
- versionMap.createMethodBody(
- id.classId, id.methodId, version.getMajor(), version.getMinor()));
- assert(body);
- body->data = data;
- return AMQResponseBody::shared_ptr(body);
-}
-
-void AMQResponseBody::printPrefix(std::ostream& out) const {
- out << "response(id=" << data.responseId << ",request=" << data.requestId
- << ",batch=" << data.batchOffset << "): ";
-}
-
-}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/AMQResponseBody.h b/qpid/cpp/src/qpid/framing/AMQResponseBody.h
deleted file mode 100644
index fa381baddd..0000000000
--- a/qpid/cpp/src/qpid/framing/AMQResponseBody.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef _framing_AMQResponseBody_h
-#define _framing_AMQResponseBody_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "AMQMethodBody.h"
-
-namespace qpid {
-namespace framing {
-
-class AMQP_MethodVersionMap;
-
-/**
- * Body of a response method frame.
- */
-class AMQResponseBody : public AMQMethodBody
-{
-
- public:
- typedef boost::shared_ptr<AMQResponseBody> shared_ptr;
-
- struct Data {
- Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
- : responseId(id), requestId(req), batchOffset(off) {}
- void encode(Buffer&) const;
- void decode(Buffer&);
-
- uint64_t responseId;
- uint64_t requestId;
- uint32_t batchOffset;
- };
-
- static Data& getData(const AMQBody::shared_ptr& body) {
- return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData();
- }
-
- static shared_ptr create(
- AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
- Buffer& buffer);
-
- AMQResponseBody(
- ProtocolVersion v, ResponseId id=0, RequestId req=0, BatchOffset off=0)
- : AMQMethodBody(v), data(id, req, off) {}
-
- uint8_t type() const { return RESPONSE_BODY; }
- void encode(Buffer& buffer) const;
-
- Data& getData() { return data; }
- ResponseId getResponseId() const { return data.responseId; }
- RequestId getRequestId() const { return data.requestId; }
- BatchOffset getBatchOffset() const { return data.batchOffset; }
- void setResponseId(ResponseId id) { data.responseId = id; }
- void setRequestId(RequestId id) { data.requestId = id; }
- void setBatchOffset(BatchOffset id) { data.batchOffset = id; }
-
- bool isResponse() const { return true; }
- protected:
- static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; }
- void printPrefix(std::ostream& out) const;
-
- private:
- Data data;
-};
-
-}} // namespace qpid::framing
-
-
-
-#endif /*!_framing_AMQResponseBody_h*/
diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp
index 0405b39e5d..7a7bf22f15 100644
--- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp
@@ -20,8 +20,6 @@
*/
#include "qpid/QpidError.h"
#include "BodyHandler.h"
-#include "AMQRequestBody.h"
-#include "AMQResponseBody.h"
#include "AMQMethodBody.h"
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
@@ -35,12 +33,6 @@ BodyHandler::~BodyHandler() {}
void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
switch(body->type())
{
- case REQUEST_BODY:
- handleRequest(shared_polymorphic_cast<AMQRequestBody>(body));
- break;
- case RESPONSE_BODY:
- handleResponse(shared_polymorphic_cast<AMQResponseBody>(body));
- break;
case METHOD_BODY:
handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
break;
diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.h b/qpid/cpp/src/qpid/framing/BodyHandler.h
index cb3f0997b0..07d1658afa 100644
--- a/qpid/cpp/src/qpid/framing/BodyHandler.h
+++ b/qpid/cpp/src/qpid/framing/BodyHandler.h
@@ -24,14 +24,9 @@
#include <boost/shared_ptr.hpp>
-#include "Requester.h"
-#include "Responder.h"
-
namespace qpid {
namespace framing {
-
-class AMQRequestBody;
-class AMQResponseBody;
+class AMQBody;
class AMQMethodBody;
class AMQHeaderBody;
class AMQContentBody;
@@ -47,8 +42,6 @@ class BodyHandler {
virtual void handleBody(boost::shared_ptr<AMQBody> body);
protected:
- virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0;
- virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0;
virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp b/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
index 4ee834b561..b3b442004a 100644
--- a/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -44,55 +44,15 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out));
}
-RequestId ChannelAdapter::send(
- shared_ptr<AMQBody> body, Correlator::Action action)
+RequestId ChannelAdapter::send(shared_ptr<AMQBody> body)
{
RequestId requestId = 0;
assertChannelOpen();
- switch (body->type()) {
- case REQUEST_BODY: {
- AMQRequestBody::shared_ptr request =
- boost::shared_polymorphic_downcast<AMQRequestBody>(body);
- requester.sending(request->getData());
- requestId = request->getData().requestId;
- if (!action.empty())
- correlator.request(requestId, action);
- break;
- }
- case RESPONSE_BODY: {
- AMQResponseBody::shared_ptr response =
- boost::shared_polymorphic_downcast<AMQResponseBody>(body);
- responder.sending(response->getData());
- break;
- }
- // No action required for other body types.
- }
AMQFrame frame(getVersion(), getId(), body);
handlers.out->handle(frame);
return requestId;
}
-void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
- assertMethodOk(*request);
- AMQRequestBody::Data& requestData = request->getData();
- responder.received(requestData);
- handleMethodInContext(request, MethodContext(this, request));
-}
-
-void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
- assertMethodOk(*response);
- AMQResponseBody::Data& responseData = response->getData();
-
- // FIXME aconway 2007-04-05: processed should be last
- // but causes problems with InProcessBroker tests because
- // we execute client code in handleMethod.
- // Need to introduce a queue & 2 threads for inprocess.
- requester.processed(responseData);
- // FIXME aconway 2007-04-04: exception handling.
- correlator.response(response);
- handleMethod(response);
-}
-
void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
assertMethodOk(*method);
handleMethodInContext(method, MethodContext(this, method));
diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.h b/qpid/cpp/src/qpid/framing/ChannelAdapter.h
index a7c9c61640..84a626c864 100644
--- a/qpid/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.h
@@ -27,9 +27,7 @@
#include "qpid/shared_ptr.h"
#include "BodyHandler.h"
-#include "Requester.h"
-#include "Responder.h"
-#include "Correlator.h"
+#include "ProtocolVersion.h"
#include "amqp_types.h"
#include "FrameHandler.h"
@@ -37,6 +35,7 @@ namespace qpid {
namespace framing {
class MethodContext;
+class OutputHandler;
/**
* Base class for client and broker channels.
@@ -71,19 +70,12 @@ class ChannelAdapter : protected BodyHandler {
/**
* Send a frame.
*@param body Body of the frame.
- *@param action optional action to execute when we receive a
- *response to this frame. Ignored if body is not a Request.
*@return If body is a request, the ID assigned else 0.
*/
- virtual RequestId send(shared_ptr<AMQBody> body,
- Correlator::Action action=Correlator::Action());
+ virtual RequestId send(shared_ptr<AMQBody> body);
virtual bool isOpen() const = 0;
- RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
- RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
- RequestId getNextSendRequestId() { return requester.getNextId(); }
-
protected:
void assertMethodOk(AMQMethodBody& method) const;
void assertChannelOpen() const;
@@ -98,14 +90,9 @@ class ChannelAdapter : protected BodyHandler {
friend class ChannelAdapterHandler;
void handleMethod(shared_ptr<AMQMethodBody>);
- void handleRequest(shared_ptr<AMQRequestBody>);
- void handleResponse(shared_ptr<AMQResponseBody>);
ChannelId id;
ProtocolVersion version;
- Requester requester;
- Responder responder;
- Correlator correlator;
FrameHandler::Chains handlers;
};
diff --git a/qpid/cpp/src/qpid/framing/Correlator.cpp b/qpid/cpp/src/qpid/framing/Correlator.cpp
deleted file mode 100644
index feeb3aa935..0000000000
--- a/qpid/cpp/src/qpid/framing/Correlator.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "Correlator.h"
-
-namespace qpid {
-namespace framing {
-
-void Correlator::request(RequestId id, Action action) {
- actions[id] = action;
-}
-
-bool Correlator::response(shared_ptr<AMQResponseBody> r) {
- Actions::iterator begin = actions.lower_bound(r->getRequestId());
- RequestId last = r->getRequestId()+r->getBatchOffset();
- Actions::iterator i = begin;
- bool didAction = false;
- for( ; i != actions.end() && i->first <= last; ++i) {
- didAction = true;
- // FIXME aconway 2007-04-04: handle exceptions thrown by action.
- i->second(r);
- }
- actions.erase(begin, i);
- return didAction;
-}
-
-
-}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/Correlator.h b/qpid/cpp/src/qpid/framing/Correlator.h
deleted file mode 100644
index 35dd7514ae..0000000000
--- a/qpid/cpp/src/qpid/framing/Correlator.h
+++ /dev/null
@@ -1,68 +0,0 @@
-#ifndef _framing_Correlator_h
-#define _framing_Correlator_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/shared_ptr.h"
-#include "qpid/framing/AMQResponseBody.h"
-#include <boost/function.hpp>
-#include <map>
-
-namespace qpid {
-namespace framing {
-
-/**
- * Correlate responses with actions established when sending the request.
- *
- * THREAD UNSAFE.
- */
-class Correlator
-{
- public:
- typedef shared_ptr<AMQResponseBody> ResponsePtr;
- typedef boost::function<void (ResponsePtr)> Action;
-
- /**
- * Note that request with id was sent, record an action to call
- * when a response arrives.
- */
- void request(RequestId id, Action doOnResponse);
-
- /**
- * Note response received, call action for associated request if any.
- * Return true of some action(s) were executed.
- */
- bool response(shared_ptr<AMQResponseBody>);
-
- /**
- * Note the given execution mark was received, call actions
- * for any requests that are impicitly responded to.
- */
- void mark(RequestId mark);
-
- private:
- typedef std::map<RequestId, Action> Actions;
- Actions actions;
-};
-
-}} // namespace qpid::framing
-
-
-
-#endif /*!_framing_Correlator_h*/
diff --git a/qpid/cpp/src/qpid/framing/Frame.cpp b/qpid/cpp/src/qpid/framing/Frame.cpp
index 7bdc0adf00..1ba8112faa 100644
--- a/qpid/cpp/src/qpid/framing/Frame.cpp
+++ b/qpid/cpp/src/qpid/framing/Frame.cpp
@@ -22,8 +22,6 @@
#include "Frame.h"
#include "qpid/QpidError.h"
-#include "AMQRequestBody.h"
-#include "AMQResponseBody.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/framing/FramingContent.h b/qpid/cpp/src/qpid/framing/FramingContent.h
index c813408cf3..9315a7716f 100644
--- a/qpid/cpp/src/qpid/framing/FramingContent.h
+++ b/qpid/cpp/src/qpid/framing/FramingContent.h
@@ -26,6 +26,8 @@
namespace qpid {
namespace framing {
+class Buffer;
+
enum discriminator_types { INLINE = 0, REFERENCE = 1 };
/**
diff --git a/qpid/cpp/src/qpid/framing/MethodContext.cpp b/qpid/cpp/src/qpid/framing/MethodContext.cpp
deleted file mode 100644
index 9dc42dcfd7..0000000000
--- a/qpid/cpp/src/qpid/framing/MethodContext.cpp
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MethodContext.h"
-#include "amqp_types.h"
-#include "AMQRequestBody.h"
-
-namespace qpid {
-namespace framing {
-
-RequestId MethodContext::getRequestId() const {
- if (methodBody->type() == REQUEST_BODY) {
- return boost::shared_polymorphic_cast<AMQRequestBody>(methodBody)->getRequestId();
- } else {
- return 0;
- }
-}
-
-}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/MethodContext.h b/qpid/cpp/src/qpid/framing/MethodContext.h
index 80e4c55d7e..102dc279d4 100644
--- a/qpid/cpp/src/qpid/framing/MethodContext.h
+++ b/qpid/cpp/src/qpid/framing/MethodContext.h
@@ -59,12 +59,6 @@ struct MethodContext
* It's also provides the request ID when constructing a response.
*/
BodyPtr methodBody;
-
- /**
- * Return methodBody's request ID.
- * It is an error to call this if methodBody is not a request.
- */
- RequestId getRequestId() const;
};
diff --git a/qpid/cpp/src/qpid/framing/Requester.cpp b/qpid/cpp/src/qpid/framing/Requester.cpp
deleted file mode 100644
index a675f0a61b..0000000000
--- a/qpid/cpp/src/qpid/framing/Requester.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/format.hpp>
-
-#include "Requester.h"
-#include "qpid/QpidError.h"
-
-namespace qpid {
-namespace framing {
-
-Requester::Requester() : lastId(0), responseMark(0) {}
-
-void Requester::sending(AMQRequestBody::Data& request) {
- request.requestId = ++lastId;
- request.responseMark = responseMark;
-}
-
-void Requester::processed(const AMQResponseBody::Data& response) {
- responseMark = response.responseId;
- firstAckRequest = response.requestId;
- lastAckRequest = firstAckRequest + response.batchOffset;
-}
-
-}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/Requester.h b/qpid/cpp/src/qpid/framing/Requester.h
deleted file mode 100644
index 65bdc9a5a1..0000000000
--- a/qpid/cpp/src/qpid/framing/Requester.h
+++ /dev/null
@@ -1,68 +0,0 @@
-#ifndef _framing_Requester_h
-#define _framing_Requester_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <set>
-#include "AMQRequestBody.h"
-#include "AMQResponseBody.h"
-
-namespace qpid {
-namespace framing {
-
-class AMQRequestBody;
-class AMQResponseBody;
-
-/**
- * Manage request IDs and the response mark for locally initiated requests.
- *
- * THREAD UNSAFE: must be locked externally.
- */
-class Requester
-{
- public:
- Requester();
-
- /** Called before sending a request to set request data. */
- void sending(AMQRequestBody::Data&);
-
- /** Called after processing a response. */
- void processed(const AMQResponseBody::Data&);
-
- /** Get the next request id to be used. */
- RequestId getNextId() { return lastId + 1; }
-
- /** Get the first request acked by last response */
- RequestId getFirstAckRequest() { return firstAckRequest; }
-
- /** Get the last request acked by last response */
- RequestId getLastAckRequest() { return lastAckRequest; }
-
- private:
- RequestId lastId;
- ResponseId responseMark;
- ResponseId firstAckRequest;
- ResponseId lastAckRequest;
-};
-
-}} // namespace qpid::framing
-
-
-
-#endif /*!_framing_Requester_h*/
diff --git a/qpid/cpp/src/qpid/framing/Responder.cpp b/qpid/cpp/src/qpid/framing/Responder.cpp
deleted file mode 100644
index 1b9c8a6c59..0000000000
--- a/qpid/cpp/src/qpid/framing/Responder.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <boost/format.hpp>
-
-#include "Responder.h"
-#include "qpid/QpidError.h"
-
-namespace qpid {
-namespace framing {
-
-Responder::Responder() : lastId(0), responseMark(0) {}
-
-void Responder::received(const AMQRequestBody::Data& request) {
- if (request.responseMark < responseMark || request.responseMark > lastId)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, boost::format("Invalid response mark %d.")
- %request.responseMark);
- responseMark = request.responseMark;
-}
-
-void Responder::sending(AMQResponseBody::Data& response) {
- response.responseId = ++lastId;
- assert(response.requestId); // Should be already set.
- response.batchOffset = 0;
-}
-
-}} // namespace qpid::framing
-
diff --git a/qpid/cpp/src/qpid/framing/Responder.h b/qpid/cpp/src/qpid/framing/Responder.h
deleted file mode 100644
index 0e1785256b..0000000000
--- a/qpid/cpp/src/qpid/framing/Responder.h
+++ /dev/null
@@ -1,61 +0,0 @@
-#ifndef _framing_Responder_h
-#define _framing_Responder_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "AMQRequestBody.h"
-#include "AMQResponseBody.h"
-
-namespace qpid {
-namespace framing {
-
-/**
- * Manage response ids and response mark remotely initianted requests.
- *
- * THREAD UNSAFE: This class is called as frames are sent or received
- * sequentially on a connection, so it does not need to be thread safe.
- */
-class Responder
-{
- public:
- Responder();
-
- /** Called after receiving a request. */
- void received(const AMQRequestBody::Data& request);
-
- /** Called before sending a response to set respose data. */
- void sending(AMQResponseBody::Data& response);
-
- /** Get the ID of the highest response acknowledged by the peer. */
- ResponseId getResponseMark() { return responseMark; }
-
- // TODO aconway 2007-01-14: Batching support - store unsent
- // Response for equality comparison with subsequent responses.
- //
-
- private:
- ResponseId lastId;
- ResponseId responseMark;
-};
-
-}} // namespace qpid::framing
-
-
-
-#endif /*!_framing_Responder_h*/
diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp
index f172d1765e..b0aeb9db6f 100644
--- a/qpid/cpp/src/tests/FramingTest.cpp
+++ b/qpid/cpp/src/tests/FramingTest.cpp
@@ -27,16 +27,11 @@
#include <typeinfo>
#include "qpid/QpidError.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/AMQRequestBody.h"
-#include "qpid/framing/AMQResponseBody.h"
-#include "qpid/framing/Requester.h"
-#include "qpid/framing/Responder.h"
#include "InProcessBroker.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Connector.h"
#include "qpid/client/ClientExchange.h"
#include "qpid/client/ClientQueue.h"
-#include "qpid/framing/Correlator.h"
#include "qpid/framing/BasicGetOkBody.h"
#include <memory>
#include <boost/lexical_cast.hpp>
@@ -64,11 +59,6 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST(testBasicConsumeBody);
CPPUNIT_TEST(testConnectionRedirectBodyFrame);
CPPUNIT_TEST(testBasicConsumeOkBodyFrame);
- CPPUNIT_TEST(testRequestBodyFrame);
- CPPUNIT_TEST(testResponseBodyFrame);
- CPPUNIT_TEST(testRequester);
- CPPUNIT_TEST(testResponder);
- CPPUNIT_TEST(testCorrelator);
CPPUNIT_TEST(testInlineContent);
CPPUNIT_TEST(testContentReference);
CPPUNIT_TEST(testContentValidation);
@@ -168,32 +158,6 @@ class FramingTest : public CppUnit::TestCase
}
}
- void testRequestBodyFrame() {
- std::string testing("testing");
- AMQBody::shared_ptr request(new ChannelOpenBody(version, testing));
- AMQFrame in(version, 999, request);
- in.encode(buffer);
- buffer.flip();
- AMQFrame out;
- out.decode(buffer);
- ChannelOpenBody* decoded =
- dynamic_cast<ChannelOpenBody*>(out.getBody().get());
- CPPUNIT_ASSERT(decoded);
- CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand());
- }
-
- void testResponseBodyFrame() {
- AMQBody::shared_ptr response(new ChannelOpenOkBody(version));
- AMQFrame in(version, 999, response);
- in.encode(buffer);
- buffer.flip();
- AMQFrame out;
- out.decode(buffer);
- ChannelOpenOkBody* decoded =
- dynamic_cast<ChannelOpenOkBody*>(out.getBody().get());
- CPPUNIT_ASSERT(decoded);
- }
-
void testInlineContent() {
Content content(INLINE, "MyData");
CPPUNIT_ASSERT(content.isInline());
@@ -247,140 +211,6 @@ class FramingTest : public CppUnit::TestCase
}
- void testRequester() {
- Requester r;
- AMQRequestBody::Data q;
- AMQResponseBody::Data p;
-
- r.sending(q);
- CPPUNIT_ASSERT_EQUAL(RequestId(1), q.requestId);
- CPPUNIT_ASSERT_EQUAL(ResponseId(0), q.responseMark);
-
- r.sending(q);
- CPPUNIT_ASSERT_EQUAL(RequestId(2), q.requestId);
- CPPUNIT_ASSERT_EQUAL(ResponseId(0), q.responseMark);
-
- // Now process a response
- p.responseId = 1;
- p.requestId = 2;
- r.processed(AMQResponseBody::Data(1, 2));
-
- r.sending(q);
- CPPUNIT_ASSERT_EQUAL(RequestId(3), q.requestId);
- CPPUNIT_ASSERT_EQUAL(ResponseId(1), q.responseMark);
-
- try {
- r.processed(p); // Already processed this response.
- CPPUNIT_FAIL("Expected exception");
- } catch (...) {}
-
- try {
- p.requestId = 50;
- r.processed(p); // No such request
- CPPUNIT_FAIL("Expected exception");
- } catch (...) {}
-
- r.sending(q); // reqId=4
- r.sending(q); // reqId=5
- r.sending(q); // reqId=6
- p.responseId++;
- p.requestId = 4;
- p.batchOffset = 2;
- r.processed(p);
- r.sending(q);
- CPPUNIT_ASSERT_EQUAL(RequestId(7), q.requestId);
- CPPUNIT_ASSERT_EQUAL(ResponseId(2), q.responseMark);
-
- p.responseId++;
- p.requestId = 1; // Out of order
- p.batchOffset = 0;
- r.processed(p);
- r.sending(q);
- CPPUNIT_ASSERT_EQUAL(RequestId(8), q.requestId);
- CPPUNIT_ASSERT_EQUAL(ResponseId(3), q.responseMark);
- }
-
- void testResponder() {
- Responder r;
- AMQRequestBody::Data q;
- AMQResponseBody::Data p;
-
- q.requestId = 1;
- q.responseMark = 0;
- r.received(q);
- p.requestId = q.requestId;
- r.sending(p);
- CPPUNIT_ASSERT_EQUAL(ResponseId(1), p.responseId);
- CPPUNIT_ASSERT_EQUAL(RequestId(1), p.requestId);
- CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset);
- CPPUNIT_ASSERT_EQUAL(ResponseId(0), r.getResponseMark());
-
- q.requestId++;
- q.responseMark = 1;
- r.received(q);
- r.sending(p);
- CPPUNIT_ASSERT_EQUAL(ResponseId(2), p.responseId);
- CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset);
- CPPUNIT_ASSERT_EQUAL(ResponseId(1), r.getResponseMark());
-
- try {
- // Response mark higher any request ID sent.
- q.responseMark = 3;
- r.received(q);
- } catch(...) {}
-
- try {
- // Response mark lower than previous response mark.
- q.responseMark = 0;
- r.received(q);
- } catch(...) {}
-
- // TODO aconway 2007-01-14: Test for batching when supported.
-
- }
-
-
- std::vector<Correlator::ResponsePtr> correlations;
-
- void correlatorCallback(Correlator::ResponsePtr r) {
- correlations.push_back(r);
- }
-
- struct DummyResponse : public AMQResponseBody {
- DummyResponse(ResponseId id=0, RequestId req=0, BatchOffset off=0)
- : AMQResponseBody(version, id, req, off) {}
- uint32_t size() const { return 0; }
- void print(std::ostream&) const {}
- MethodId amqpMethodId() const { return 0; }
- ClassId amqpClassId() const { return 0; }
- void encodeContent(Buffer& ) const {}
- void decodeContent(Buffer& ) {}
- };
-
- void testCorrelator() {
- CPPUNIT_ASSERT(correlations.empty());
- Correlator c;
- Correlator::Action action = boost::bind(&FramingTest::correlatorCallback, this, _1);
- c.request(5, action);
- Correlator::ResponsePtr r1(new DummyResponse(3, 5, 0));
- CPPUNIT_ASSERT(c.response(r1));
- CPPUNIT_ASSERT_EQUAL(size_t(1), correlations.size());
- CPPUNIT_ASSERT(correlations.front() == r1);
- correlations.clear();
-
- c.request(6, action);
- c.request(7, action);
- c.request(8, action);
- Correlator::ResponsePtr r2(new DummyResponse(4, 6, 3));
- CPPUNIT_ASSERT(c.response(r2));
- CPPUNIT_ASSERT_EQUAL(size_t(3), correlations.size());
- CPPUNIT_ASSERT(r2 == correlations[0]);
- CPPUNIT_ASSERT(r2 == correlations[1]);
- CPPUNIT_ASSERT(r2 == correlations[2]);
- Correlator::ResponsePtr r3(new DummyResponse(5, 99, 0));
- CPPUNIT_ASSERT(!c.response(r3));
- }
-
// expect may contain null chars so use string(ptr,size) constructor
// Use sizeof(expect)-1 to strip the trailing null.
#define ASSERT_FRAME(expect, frame) \