diff options
author | Alan Conway <aconway@apache.org> | 2007-08-14 14:29:07 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-14 14:29:07 +0000 |
commit | 284aec62741c7dc069c8e7199a5ce3bf61c277ff (patch) | |
tree | 774d7dfd17a7a0ca562f1a0788b028984d6f0b3a | |
parent | 9d660da0bad7c37cb55d4037711265fd0b3b7e9d (diff) | |
download | qpid-python-284aec62741c7dc069c8e7199a5ce3bf61c277ff.tar.gz |
Deleted following files that are obsolete for 0-10:
src/qpid/framing/AMQRequestBody.cpp
src/qpid/framing/AMQRequestBody.h
src/qpid/framing/AMQResponseBody.cpp
src/qpid/framing/AMQResponseBody.h
src/qpid/framing/Correlator.cpp
src/qpid/framing/Correlator.h
src/qpid/framing/MethodContext.cpp
src/qpid/framing/Requester.cpp
src/qpid/framing/Requester.h
src/qpid/framing/Responder.cpp
src/qpid/framing/Responder.h
Made changes to support their deletion.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@565770 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 14 insertions, 932 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 118a60ec70..690f939e71 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -135,15 +135,12 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(platform_src) \ qpid/framing/AMQBody.cpp \ - qpid/framing/AMQRequestBody.cpp \ - qpid/framing/AMQResponseBody.cpp \ qpid/framing/AMQContentBody.cpp \ qpid/framing/AMQFrame.cpp \ qpid/framing/AMQHeaderBody.cpp \ qpid/framing/AMQHeartbeatBody.cpp \ qpid/framing/AMQMethodBody.cpp \ qpid/framing/FrameHandler.h \ - qpid/framing/MethodContext.cpp \ qpid/framing/BasicHeaderProperties.cpp \ qpid/framing/BodyHandler.cpp \ qpid/framing/ChannelAdapter.cpp \ @@ -154,11 +151,8 @@ libqpidcommon_la_SOURCES = \ qpid/framing/ProtocolInitiation.cpp \ qpid/framing/ProtocolVersion.cpp \ qpid/framing/ProtocolVersionException.cpp \ - qpid/framing/Requester.cpp \ - qpid/framing/Responder.cpp \ qpid/framing/SequenceNumber.cpp \ qpid/framing/SequenceNumberSet.cpp \ - qpid/framing/Correlator.cpp \ qpid/framing/Value.cpp \ qpid/framing/Proxy.cpp \ qpid/framing/Uuid.cpp \ @@ -366,13 +360,10 @@ nobase_include_HEADERS = \ qpid/framing/AMQHeaderBody.h \ qpid/framing/AMQHeartbeatBody.h \ qpid/framing/AMQMethodBody.h \ - qpid/framing/AMQRequestBody.h \ - qpid/framing/AMQResponseBody.h \ qpid/framing/BasicHeaderProperties.h \ qpid/framing/BodyHandler.h \ qpid/framing/Buffer.h \ qpid/framing/ChannelAdapter.h \ - qpid/framing/Correlator.h \ qpid/framing/FieldTable.h \ qpid/framing/FramingContent.h \ qpid/framing/HeaderProperties.h \ @@ -385,8 +376,6 @@ nobase_include_HEADERS = \ qpid/framing/ProtocolVersion.h \ qpid/framing/ProtocolVersionException.h \ qpid/framing/Proxy.h \ - qpid/framing/Requester.h \ - qpid/framing/Responder.h \ qpid/framing/SerializeHandler.h \ qpid/framing/SequenceNumber.h \ qpid/framing/SequenceNumberSet.h \ diff --git a/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp b/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp index 65933660f1..4fa4b2c238 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -50,14 +50,10 @@ void ConnectionAdapter::handleMethodInContext( ) { try{ - handler->client.setResponseTo(context.getRequestId()); method->invoke(*this, context); - handler->client.setResponseTo(0); }catch(ConnectionException& e){ - handler->client.setResponseTo(0); handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ - handler->client.setResponseTo(0); handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp index c728a800ab..70f7c3b8ec 100644 --- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -162,10 +162,9 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); - RequestId requestId = context.getRequestId(); if (transfer->getBody().isInline()) { - MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); + MessageMessage::shared_ptr message(new MessageMessage(&connection, 0, transfer)); channel.handleInlineTransfer(message); } else { throw ConnectionException(540, "References no longer supported"); diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index 08f91bf69a..fd0a5cfbe1 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -171,16 +171,16 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ msg->deliver(*this, tag, token, connection.getFrameMax()); } -RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action) +RequestId SemanticHandler::send(shared_ptr<AMQBody> body) { Mutex::ScopedLock l(outLock); uint8_t type(body->type()); - if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) { + if (type == METHOD_BODY) { //temporary hack until channel management is moved to its own handler: if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) { ++outgoing.hwm; //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; } } - return ChannelAdapter::send(body, action); + return ChannelAdapter::send(body); } diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index b863b3486e..7d5d95243e 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -59,7 +59,7 @@ class SemanticHandler : private framing::ChannelAdapter, void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); - framing::RequestId send(shared_ptr<framing::AMQBody> body, framing::Correlator::Action action=framing::Correlator::Action()); + framing::RequestId send(shared_ptr<framing::AMQBody> body); //delivery adapter methods: diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.cpp b/qpid/cpp/src/qpid/cluster/SessionManager.cpp index c9e79b4bbc..715ed7817c 100644 --- a/qpid/cpp/src/qpid/cluster/SessionManager.cpp +++ b/qpid/cpp/src/qpid/cluster/SessionManager.cpp @@ -67,7 +67,7 @@ using namespace broker; virtual bool isOpen() const{ return true; } virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){} // No-op send. - virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; } + virtual RequestId send(shared_ptr<AMQBody>) { return 0; } //delivery adapter methods, also no-ops: virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; } diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index 778c9ab505..f79eae3524 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -23,8 +23,7 @@ #include "AMQFrame.h" #include "qpid/QpidError.h" -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" +#include "AMQMethodBody.h" namespace qpid { @@ -91,12 +90,6 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) case METHOD_BODY: body = AMQMethodBody::create(versionMap, version, buffer); break; - case REQUEST_BODY: - body = AMQRequestBody::create(versionMap, version, buffer); - break; - case RESPONSE_BODY: - body = AMQResponseBody::create(versionMap, version, buffer); - break; case HEADER_BODY: body = AMQBody::shared_ptr(new AMQHeaderBody()); break; diff --git a/qpid/cpp/src/qpid/framing/AMQRequestBody.cpp b/qpid/cpp/src/qpid/framing/AMQRequestBody.cpp deleted file mode 100644 index 9e7b307a47..0000000000 --- a/qpid/cpp/src/qpid/framing/AMQRequestBody.cpp +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQRequestBody.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" - -namespace qpid { -namespace framing { - -void AMQRequestBody::Data::encode(Buffer& buffer) const { - buffer.putLongLong(requestId); - buffer.putLongLong(responseMark); - buffer.putLong(0); // Reserved long in spec. -} - -void AMQRequestBody::Data::decode(Buffer& buffer) { - requestId = buffer.getLongLong(); - responseMark = buffer.getLongLong(); - buffer.getLong(); // Ignore reserved long. -} - -void AMQRequestBody::encode(Buffer& buffer) const { - data.encode(buffer); - encodeId(buffer); - encodeContent(buffer); -} - -AMQRequestBody::shared_ptr -AMQRequestBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - Data data; - data.decode(buffer); - id.decode(buffer); - AMQRequestBody* body = dynamic_cast<AMQRequestBody*>( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); - assert(body); - body->data = data; - return AMQRequestBody::shared_ptr(body); -} - -void AMQRequestBody::printPrefix(std::ostream& out) const { - out << "request(id=" << data.requestId << ",mark=" - << data.responseMark << "): "; -} - -}} // namespace qpid::framing - diff --git a/qpid/cpp/src/qpid/framing/AMQRequestBody.h b/qpid/cpp/src/qpid/framing/AMQRequestBody.h deleted file mode 100644 index 691a4c8768..0000000000 --- a/qpid/cpp/src/qpid/framing/AMQRequestBody.h +++ /dev/null @@ -1,78 +0,0 @@ -#ifndef _framing_AMQRequestBody_h -#define _framing_AMQRequestBody_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/AMQMethodBody.h" - -namespace qpid { -namespace framing { - -/** - * Body of a request method frame. - */ -class AMQRequestBody : public AMQMethodBody -{ - public: - typedef boost::shared_ptr<AMQRequestBody> shared_ptr; - - struct Data { - Data(RequestId id=0, ResponseId mark=0) - : requestId(id), responseMark(mark) {} - void encode(Buffer&) const; - void decode(Buffer&); - - RequestId requestId; - ResponseId responseMark; - }; - - static Data& getData(const AMQBody::shared_ptr& body) { - return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData(); - } - - static shared_ptr create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer); - - AMQRequestBody(ProtocolVersion v, RequestId id=0, ResponseId mark=0) - : AMQMethodBody(v), data(id, mark) {} - - uint8_t type() const { return REQUEST_BODY; } - void encode(Buffer& buffer) const; - - Data& getData() { return data; } - RequestId getRequestId() const { return data.requestId; } - ResponseId getResponseMark() const { return data.responseMark; } - void setRequestId(RequestId id) { data.requestId=id; } - void setResponseMark(ResponseId mark) { data.responseMark=mark; } - - bool isRequest()const { return true; } - static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; } - protected: - void printPrefix(std::ostream& out) const; - - private: - Data data; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_AMQRequestBody_h*/ diff --git a/qpid/cpp/src/qpid/framing/AMQResponseBody.cpp b/qpid/cpp/src/qpid/framing/AMQResponseBody.cpp deleted file mode 100644 index 4354a9c564..0000000000 --- a/qpid/cpp/src/qpid/framing/AMQResponseBody.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQFrame.h" -#include "AMQResponseBody.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" - -namespace qpid { -namespace framing { - -void AMQResponseBody::Data::encode(Buffer& buffer) const { - buffer.putLongLong(responseId); - buffer.putLongLong(requestId); - buffer.putLong(batchOffset); -} - -void AMQResponseBody::Data::decode(Buffer& buffer) { - responseId = buffer.getLongLong(); - requestId = buffer.getLongLong(); - batchOffset = buffer.getLong(); -} - -void AMQResponseBody::encode(Buffer& buffer) const { - data.encode(buffer); - encodeId(buffer); - encodeContent(buffer); -} - -AMQResponseBody::shared_ptr AMQResponseBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - Data data; - data.decode(buffer); - id.decode(buffer); - AMQResponseBody* body = dynamic_cast<AMQResponseBody*>( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); - assert(body); - body->data = data; - return AMQResponseBody::shared_ptr(body); -} - -void AMQResponseBody::printPrefix(std::ostream& out) const { - out << "response(id=" << data.responseId << ",request=" << data.requestId - << ",batch=" << data.batchOffset << "): "; -} - -}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/AMQResponseBody.h b/qpid/cpp/src/qpid/framing/AMQResponseBody.h deleted file mode 100644 index fa381baddd..0000000000 --- a/qpid/cpp/src/qpid/framing/AMQResponseBody.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef _framing_AMQResponseBody_h -#define _framing_AMQResponseBody_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQMethodBody.h" - -namespace qpid { -namespace framing { - -class AMQP_MethodVersionMap; - -/** - * Body of a response method frame. - */ -class AMQResponseBody : public AMQMethodBody -{ - - public: - typedef boost::shared_ptr<AMQResponseBody> shared_ptr; - - struct Data { - Data(ResponseId id=0, RequestId req=0, BatchOffset off=0) - : responseId(id), requestId(req), batchOffset(off) {} - void encode(Buffer&) const; - void decode(Buffer&); - - uint64_t responseId; - uint64_t requestId; - uint32_t batchOffset; - }; - - static Data& getData(const AMQBody::shared_ptr& body) { - return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData(); - } - - static shared_ptr create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer); - - AMQResponseBody( - ProtocolVersion v, ResponseId id=0, RequestId req=0, BatchOffset off=0) - : AMQMethodBody(v), data(id, req, off) {} - - uint8_t type() const { return RESPONSE_BODY; } - void encode(Buffer& buffer) const; - - Data& getData() { return data; } - ResponseId getResponseId() const { return data.responseId; } - RequestId getRequestId() const { return data.requestId; } - BatchOffset getBatchOffset() const { return data.batchOffset; } - void setResponseId(ResponseId id) { data.responseId = id; } - void setRequestId(RequestId id) { data.requestId = id; } - void setBatchOffset(BatchOffset id) { data.batchOffset = id; } - - bool isResponse() const { return true; } - protected: - static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; } - void printPrefix(std::ostream& out) const; - - private: - Data data; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_AMQResponseBody_h*/ diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp index 0405b39e5d..7a7bf22f15 100644 --- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp +++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp @@ -20,8 +20,6 @@ */ #include "qpid/QpidError.h" #include "BodyHandler.h" -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" #include "AMQMethodBody.h" #include "AMQHeaderBody.h" #include "AMQContentBody.h" @@ -35,12 +33,6 @@ BodyHandler::~BodyHandler() {} void BodyHandler::handleBody(shared_ptr<AMQBody> body) { switch(body->type()) { - case REQUEST_BODY: - handleRequest(shared_polymorphic_cast<AMQRequestBody>(body)); - break; - case RESPONSE_BODY: - handleResponse(shared_polymorphic_cast<AMQResponseBody>(body)); - break; case METHOD_BODY: handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); break; diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.h b/qpid/cpp/src/qpid/framing/BodyHandler.h index cb3f0997b0..07d1658afa 100644 --- a/qpid/cpp/src/qpid/framing/BodyHandler.h +++ b/qpid/cpp/src/qpid/framing/BodyHandler.h @@ -24,14 +24,9 @@ #include <boost/shared_ptr.hpp> -#include "Requester.h" -#include "Responder.h" - namespace qpid { namespace framing { - -class AMQRequestBody; -class AMQResponseBody; +class AMQBody; class AMQMethodBody; class AMQHeaderBody; class AMQContentBody; @@ -47,8 +42,6 @@ class BodyHandler { virtual void handleBody(boost::shared_ptr<AMQBody> body); protected: - virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0; - virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0; virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp b/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp index 4ee834b561..b3b442004a 100644 --- a/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -44,55 +44,15 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out)); } -RequestId ChannelAdapter::send( - shared_ptr<AMQBody> body, Correlator::Action action) +RequestId ChannelAdapter::send(shared_ptr<AMQBody> body) { RequestId requestId = 0; assertChannelOpen(); - switch (body->type()) { - case REQUEST_BODY: { - AMQRequestBody::shared_ptr request = - boost::shared_polymorphic_downcast<AMQRequestBody>(body); - requester.sending(request->getData()); - requestId = request->getData().requestId; - if (!action.empty()) - correlator.request(requestId, action); - break; - } - case RESPONSE_BODY: { - AMQResponseBody::shared_ptr response = - boost::shared_polymorphic_downcast<AMQResponseBody>(body); - responder.sending(response->getData()); - break; - } - // No action required for other body types. - } AMQFrame frame(getVersion(), getId(), body); handlers.out->handle(frame); return requestId; } -void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { - assertMethodOk(*request); - AMQRequestBody::Data& requestData = request->getData(); - responder.received(requestData); - handleMethodInContext(request, MethodContext(this, request)); -} - -void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { - assertMethodOk(*response); - AMQResponseBody::Data& responseData = response->getData(); - - // FIXME aconway 2007-04-05: processed should be last - // but causes problems with InProcessBroker tests because - // we execute client code in handleMethod. - // Need to introduce a queue & 2 threads for inprocess. - requester.processed(responseData); - // FIXME aconway 2007-04-04: exception handling. - correlator.response(response); - handleMethod(response); -} - void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { assertMethodOk(*method); handleMethodInContext(method, MethodContext(this, method)); diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.h b/qpid/cpp/src/qpid/framing/ChannelAdapter.h index a7c9c61640..84a626c864 100644 --- a/qpid/cpp/src/qpid/framing/ChannelAdapter.h +++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.h @@ -27,9 +27,7 @@ #include "qpid/shared_ptr.h" #include "BodyHandler.h" -#include "Requester.h" -#include "Responder.h" -#include "Correlator.h" +#include "ProtocolVersion.h" #include "amqp_types.h" #include "FrameHandler.h" @@ -37,6 +35,7 @@ namespace qpid { namespace framing { class MethodContext; +class OutputHandler; /** * Base class for client and broker channels. @@ -71,19 +70,12 @@ class ChannelAdapter : protected BodyHandler { /** * Send a frame. *@param body Body of the frame. - *@param action optional action to execute when we receive a - *response to this frame. Ignored if body is not a Request. *@return If body is a request, the ID assigned else 0. */ - virtual RequestId send(shared_ptr<AMQBody> body, - Correlator::Action action=Correlator::Action()); + virtual RequestId send(shared_ptr<AMQBody> body); virtual bool isOpen() const = 0; - RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } - RequestId getLastAckRequest() { return requester.getLastAckRequest(); } - RequestId getNextSendRequestId() { return requester.getNextId(); } - protected: void assertMethodOk(AMQMethodBody& method) const; void assertChannelOpen() const; @@ -98,14 +90,9 @@ class ChannelAdapter : protected BodyHandler { friend class ChannelAdapterHandler; void handleMethod(shared_ptr<AMQMethodBody>); - void handleRequest(shared_ptr<AMQRequestBody>); - void handleResponse(shared_ptr<AMQResponseBody>); ChannelId id; ProtocolVersion version; - Requester requester; - Responder responder; - Correlator correlator; FrameHandler::Chains handlers; }; diff --git a/qpid/cpp/src/qpid/framing/Correlator.cpp b/qpid/cpp/src/qpid/framing/Correlator.cpp deleted file mode 100644 index feeb3aa935..0000000000 --- a/qpid/cpp/src/qpid/framing/Correlator.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Correlator.h" - -namespace qpid { -namespace framing { - -void Correlator::request(RequestId id, Action action) { - actions[id] = action; -} - -bool Correlator::response(shared_ptr<AMQResponseBody> r) { - Actions::iterator begin = actions.lower_bound(r->getRequestId()); - RequestId last = r->getRequestId()+r->getBatchOffset(); - Actions::iterator i = begin; - bool didAction = false; - for( ; i != actions.end() && i->first <= last; ++i) { - didAction = true; - // FIXME aconway 2007-04-04: handle exceptions thrown by action. - i->second(r); - } - actions.erase(begin, i); - return didAction; -} - - -}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Correlator.h b/qpid/cpp/src/qpid/framing/Correlator.h deleted file mode 100644 index 35dd7514ae..0000000000 --- a/qpid/cpp/src/qpid/framing/Correlator.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef _framing_Correlator_h -#define _framing_Correlator_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/shared_ptr.h" -#include "qpid/framing/AMQResponseBody.h" -#include <boost/function.hpp> -#include <map> - -namespace qpid { -namespace framing { - -/** - * Correlate responses with actions established when sending the request. - * - * THREAD UNSAFE. - */ -class Correlator -{ - public: - typedef shared_ptr<AMQResponseBody> ResponsePtr; - typedef boost::function<void (ResponsePtr)> Action; - - /** - * Note that request with id was sent, record an action to call - * when a response arrives. - */ - void request(RequestId id, Action doOnResponse); - - /** - * Note response received, call action for associated request if any. - * Return true of some action(s) were executed. - */ - bool response(shared_ptr<AMQResponseBody>); - - /** - * Note the given execution mark was received, call actions - * for any requests that are impicitly responded to. - */ - void mark(RequestId mark); - - private: - typedef std::map<RequestId, Action> Actions; - Actions actions; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_Correlator_h*/ diff --git a/qpid/cpp/src/qpid/framing/Frame.cpp b/qpid/cpp/src/qpid/framing/Frame.cpp index 7bdc0adf00..1ba8112faa 100644 --- a/qpid/cpp/src/qpid/framing/Frame.cpp +++ b/qpid/cpp/src/qpid/framing/Frame.cpp @@ -22,8 +22,6 @@ #include "Frame.h" #include "qpid/QpidError.h" -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/framing/FramingContent.h b/qpid/cpp/src/qpid/framing/FramingContent.h index c813408cf3..9315a7716f 100644 --- a/qpid/cpp/src/qpid/framing/FramingContent.h +++ b/qpid/cpp/src/qpid/framing/FramingContent.h @@ -26,6 +26,8 @@ namespace qpid { namespace framing { +class Buffer; + enum discriminator_types { INLINE = 0, REFERENCE = 1 }; /** diff --git a/qpid/cpp/src/qpid/framing/MethodContext.cpp b/qpid/cpp/src/qpid/framing/MethodContext.cpp deleted file mode 100644 index 9dc42dcfd7..0000000000 --- a/qpid/cpp/src/qpid/framing/MethodContext.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MethodContext.h" -#include "amqp_types.h" -#include "AMQRequestBody.h" - -namespace qpid { -namespace framing { - -RequestId MethodContext::getRequestId() const { - if (methodBody->type() == REQUEST_BODY) { - return boost::shared_polymorphic_cast<AMQRequestBody>(methodBody)->getRequestId(); - } else { - return 0; - } -} - -}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/MethodContext.h b/qpid/cpp/src/qpid/framing/MethodContext.h index 80e4c55d7e..102dc279d4 100644 --- a/qpid/cpp/src/qpid/framing/MethodContext.h +++ b/qpid/cpp/src/qpid/framing/MethodContext.h @@ -59,12 +59,6 @@ struct MethodContext * It's also provides the request ID when constructing a response. */ BodyPtr methodBody; - - /** - * Return methodBody's request ID. - * It is an error to call this if methodBody is not a request. - */ - RequestId getRequestId() const; }; diff --git a/qpid/cpp/src/qpid/framing/Requester.cpp b/qpid/cpp/src/qpid/framing/Requester.cpp deleted file mode 100644 index a675f0a61b..0000000000 --- a/qpid/cpp/src/qpid/framing/Requester.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/format.hpp> - -#include "Requester.h" -#include "qpid/QpidError.h" - -namespace qpid { -namespace framing { - -Requester::Requester() : lastId(0), responseMark(0) {} - -void Requester::sending(AMQRequestBody::Data& request) { - request.requestId = ++lastId; - request.responseMark = responseMark; -} - -void Requester::processed(const AMQResponseBody::Data& response) { - responseMark = response.responseId; - firstAckRequest = response.requestId; - lastAckRequest = firstAckRequest + response.batchOffset; -} - -}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Requester.h b/qpid/cpp/src/qpid/framing/Requester.h deleted file mode 100644 index 65bdc9a5a1..0000000000 --- a/qpid/cpp/src/qpid/framing/Requester.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef _framing_Requester_h -#define _framing_Requester_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <set> -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" - -namespace qpid { -namespace framing { - -class AMQRequestBody; -class AMQResponseBody; - -/** - * Manage request IDs and the response mark for locally initiated requests. - * - * THREAD UNSAFE: must be locked externally. - */ -class Requester -{ - public: - Requester(); - - /** Called before sending a request to set request data. */ - void sending(AMQRequestBody::Data&); - - /** Called after processing a response. */ - void processed(const AMQResponseBody::Data&); - - /** Get the next request id to be used. */ - RequestId getNextId() { return lastId + 1; } - - /** Get the first request acked by last response */ - RequestId getFirstAckRequest() { return firstAckRequest; } - - /** Get the last request acked by last response */ - RequestId getLastAckRequest() { return lastAckRequest; } - - private: - RequestId lastId; - ResponseId responseMark; - ResponseId firstAckRequest; - ResponseId lastAckRequest; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_Requester_h*/ diff --git a/qpid/cpp/src/qpid/framing/Responder.cpp b/qpid/cpp/src/qpid/framing/Responder.cpp deleted file mode 100644 index 1b9c8a6c59..0000000000 --- a/qpid/cpp/src/qpid/framing/Responder.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/format.hpp> - -#include "Responder.h" -#include "qpid/QpidError.h" - -namespace qpid { -namespace framing { - -Responder::Responder() : lastId(0), responseMark(0) {} - -void Responder::received(const AMQRequestBody::Data& request) { - if (request.responseMark < responseMark || request.responseMark > lastId) - THROW_QPID_ERROR( - PROTOCOL_ERROR, boost::format("Invalid response mark %d.") - %request.responseMark); - responseMark = request.responseMark; -} - -void Responder::sending(AMQResponseBody::Data& response) { - response.responseId = ++lastId; - assert(response.requestId); // Should be already set. - response.batchOffset = 0; -} - -}} // namespace qpid::framing - diff --git a/qpid/cpp/src/qpid/framing/Responder.h b/qpid/cpp/src/qpid/framing/Responder.h deleted file mode 100644 index 0e1785256b..0000000000 --- a/qpid/cpp/src/qpid/framing/Responder.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef _framing_Responder_h -#define _framing_Responder_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" - -namespace qpid { -namespace framing { - -/** - * Manage response ids and response mark remotely initianted requests. - * - * THREAD UNSAFE: This class is called as frames are sent or received - * sequentially on a connection, so it does not need to be thread safe. - */ -class Responder -{ - public: - Responder(); - - /** Called after receiving a request. */ - void received(const AMQRequestBody::Data& request); - - /** Called before sending a response to set respose data. */ - void sending(AMQResponseBody::Data& response); - - /** Get the ID of the highest response acknowledged by the peer. */ - ResponseId getResponseMark() { return responseMark; } - - // TODO aconway 2007-01-14: Batching support - store unsent - // Response for equality comparison with subsequent responses. - // - - private: - ResponseId lastId; - ResponseId responseMark; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_Responder_h*/ diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp index f172d1765e..b0aeb9db6f 100644 --- a/qpid/cpp/src/tests/FramingTest.cpp +++ b/qpid/cpp/src/tests/FramingTest.cpp @@ -27,16 +27,11 @@ #include <typeinfo> #include "qpid/QpidError.h" #include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/AMQRequestBody.h" -#include "qpid/framing/AMQResponseBody.h" -#include "qpid/framing/Requester.h" -#include "qpid/framing/Responder.h" #include "InProcessBroker.h" #include "qpid/client/Connection.h" #include "qpid/client/Connector.h" #include "qpid/client/ClientExchange.h" #include "qpid/client/ClientQueue.h" -#include "qpid/framing/Correlator.h" #include "qpid/framing/BasicGetOkBody.h" #include <memory> #include <boost/lexical_cast.hpp> @@ -64,11 +59,6 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_TEST(testBasicConsumeBody); CPPUNIT_TEST(testConnectionRedirectBodyFrame); CPPUNIT_TEST(testBasicConsumeOkBodyFrame); - CPPUNIT_TEST(testRequestBodyFrame); - CPPUNIT_TEST(testResponseBodyFrame); - CPPUNIT_TEST(testRequester); - CPPUNIT_TEST(testResponder); - CPPUNIT_TEST(testCorrelator); CPPUNIT_TEST(testInlineContent); CPPUNIT_TEST(testContentReference); CPPUNIT_TEST(testContentValidation); @@ -168,32 +158,6 @@ class FramingTest : public CppUnit::TestCase } } - void testRequestBodyFrame() { - std::string testing("testing"); - AMQBody::shared_ptr request(new ChannelOpenBody(version, testing)); - AMQFrame in(version, 999, request); - in.encode(buffer); - buffer.flip(); - AMQFrame out; - out.decode(buffer); - ChannelOpenBody* decoded = - dynamic_cast<ChannelOpenBody*>(out.getBody().get()); - CPPUNIT_ASSERT(decoded); - CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand()); - } - - void testResponseBodyFrame() { - AMQBody::shared_ptr response(new ChannelOpenOkBody(version)); - AMQFrame in(version, 999, response); - in.encode(buffer); - buffer.flip(); - AMQFrame out; - out.decode(buffer); - ChannelOpenOkBody* decoded = - dynamic_cast<ChannelOpenOkBody*>(out.getBody().get()); - CPPUNIT_ASSERT(decoded); - } - void testInlineContent() { Content content(INLINE, "MyData"); CPPUNIT_ASSERT(content.isInline()); @@ -247,140 +211,6 @@ class FramingTest : public CppUnit::TestCase } - void testRequester() { - Requester r; - AMQRequestBody::Data q; - AMQResponseBody::Data p; - - r.sending(q); - CPPUNIT_ASSERT_EQUAL(RequestId(1), q.requestId); - CPPUNIT_ASSERT_EQUAL(ResponseId(0), q.responseMark); - - r.sending(q); - CPPUNIT_ASSERT_EQUAL(RequestId(2), q.requestId); - CPPUNIT_ASSERT_EQUAL(ResponseId(0), q.responseMark); - - // Now process a response - p.responseId = 1; - p.requestId = 2; - r.processed(AMQResponseBody::Data(1, 2)); - - r.sending(q); - CPPUNIT_ASSERT_EQUAL(RequestId(3), q.requestId); - CPPUNIT_ASSERT_EQUAL(ResponseId(1), q.responseMark); - - try { - r.processed(p); // Already processed this response. - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - - try { - p.requestId = 50; - r.processed(p); // No such request - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - - r.sending(q); // reqId=4 - r.sending(q); // reqId=5 - r.sending(q); // reqId=6 - p.responseId++; - p.requestId = 4; - p.batchOffset = 2; - r.processed(p); - r.sending(q); - CPPUNIT_ASSERT_EQUAL(RequestId(7), q.requestId); - CPPUNIT_ASSERT_EQUAL(ResponseId(2), q.responseMark); - - p.responseId++; - p.requestId = 1; // Out of order - p.batchOffset = 0; - r.processed(p); - r.sending(q); - CPPUNIT_ASSERT_EQUAL(RequestId(8), q.requestId); - CPPUNIT_ASSERT_EQUAL(ResponseId(3), q.responseMark); - } - - void testResponder() { - Responder r; - AMQRequestBody::Data q; - AMQResponseBody::Data p; - - q.requestId = 1; - q.responseMark = 0; - r.received(q); - p.requestId = q.requestId; - r.sending(p); - CPPUNIT_ASSERT_EQUAL(ResponseId(1), p.responseId); - CPPUNIT_ASSERT_EQUAL(RequestId(1), p.requestId); - CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); - CPPUNIT_ASSERT_EQUAL(ResponseId(0), r.getResponseMark()); - - q.requestId++; - q.responseMark = 1; - r.received(q); - r.sending(p); - CPPUNIT_ASSERT_EQUAL(ResponseId(2), p.responseId); - CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); - CPPUNIT_ASSERT_EQUAL(ResponseId(1), r.getResponseMark()); - - try { - // Response mark higher any request ID sent. - q.responseMark = 3; - r.received(q); - } catch(...) {} - - try { - // Response mark lower than previous response mark. - q.responseMark = 0; - r.received(q); - } catch(...) {} - - // TODO aconway 2007-01-14: Test for batching when supported. - - } - - - std::vector<Correlator::ResponsePtr> correlations; - - void correlatorCallback(Correlator::ResponsePtr r) { - correlations.push_back(r); - } - - struct DummyResponse : public AMQResponseBody { - DummyResponse(ResponseId id=0, RequestId req=0, BatchOffset off=0) - : AMQResponseBody(version, id, req, off) {} - uint32_t size() const { return 0; } - void print(std::ostream&) const {} - MethodId amqpMethodId() const { return 0; } - ClassId amqpClassId() const { return 0; } - void encodeContent(Buffer& ) const {} - void decodeContent(Buffer& ) {} - }; - - void testCorrelator() { - CPPUNIT_ASSERT(correlations.empty()); - Correlator c; - Correlator::Action action = boost::bind(&FramingTest::correlatorCallback, this, _1); - c.request(5, action); - Correlator::ResponsePtr r1(new DummyResponse(3, 5, 0)); - CPPUNIT_ASSERT(c.response(r1)); - CPPUNIT_ASSERT_EQUAL(size_t(1), correlations.size()); - CPPUNIT_ASSERT(correlations.front() == r1); - correlations.clear(); - - c.request(6, action); - c.request(7, action); - c.request(8, action); - Correlator::ResponsePtr r2(new DummyResponse(4, 6, 3)); - CPPUNIT_ASSERT(c.response(r2)); - CPPUNIT_ASSERT_EQUAL(size_t(3), correlations.size()); - CPPUNIT_ASSERT(r2 == correlations[0]); - CPPUNIT_ASSERT(r2 == correlations[1]); - CPPUNIT_ASSERT(r2 == correlations[2]); - Correlator::ResponsePtr r3(new DummyResponse(5, 99, 0)); - CPPUNIT_ASSERT(!c.response(r3)); - } - // expect may contain null chars so use string(ptr,size) constructor // Use sizeof(expect)-1 to strip the trailing null. #define ASSERT_FRAME(expect, frame) \ |