diff options
author | Alan Conway <aconway@apache.org> | 2007-01-14 18:30:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-14 18:30:25 +0000 |
commit | 972d4b29d086c803ffc4bee08c8c8eb0f2e788af (patch) | |
tree | be8f8780e1d88a527bff688acdc2851bcaa9da8a /cpp | |
parent | 7abe972eb67e965bc1ec8d171c4dadba2db5afae (diff) | |
download | qpid-python-972d4b29d086c803ffc4bee08c8c8eb0f2e788af.tar.gz |
* Added Requester/Responder classes to manage request-ids, response-ids,
and response-mark. Response batches not yet supported.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496110 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/lib/common/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.h | 22 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.h | 22 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.cpp | 46 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.h | 59 | ||||
-rw-r--r-- | cpp/lib/common/framing/Responder.cpp | 40 | ||||
-rw-r--r-- | cpp/lib/common/framing/Responder.h | 61 | ||||
-rw-r--r-- | cpp/tests/FramingTest.cpp | 97 |
9 files changed, 327 insertions, 25 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index adf2758eda..7a44d9f872 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -224,7 +224,6 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin parent->framemax, parent->queues->getStore(), parent->settings.stagingThreshold); - // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 parent->client->getChannel().openOk(channel, std::string()/* ID */); } @@ -279,7 +278,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index 941b30f8e5..541145ac97 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -72,6 +72,8 @@ libqpidcommon_la_SOURCES = \ $(framing)/ProtocolInitiation.cpp \ $(framing)/ProtocolVersion.cpp \ $(framing)/ProtocolVersionException.cpp \ + $(framing)/Requester.cpp \ + $(framing)/Responder.cpp \ $(framing)/Value.cpp \ $(gen)/AMQP_ClientProxy.cpp \ $(gen)/AMQP_HighestVersion.h \ diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 1836a7d49d..74aa398606 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -31,7 +31,17 @@ class AMQRequestBody : public AMQMethodBody { public: typedef boost::shared_ptr<AMQRequestBody> shared_ptr; - + + struct Data { + Data(RequestId id=0, ResponseId mark=0) + : requestId(id), responseMark(mark) {} + void encode(Buffer&) const; + void decode(Buffer&); + + RequestId requestId; + ResponseId responseMark; + }; + static shared_ptr create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer); @@ -51,16 +61,6 @@ class AMQRequestBody : public AMQMethodBody static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; } private: - struct Data { - Data(RequestId id=0, ResponseId mark=0) - : requestId(id), responseMark(mark) {} - void encode(Buffer&) const; - void decode(Buffer&); - - RequestId requestId; - ResponseId responseMark; - }; - Data data; }; diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index bfd6cb2b9a..d7095d3da0 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -35,6 +35,17 @@ class AMQResponseBody : public AMQMethodBody public: typedef boost::shared_ptr<AMQResponseBody> shared_ptr; + struct Data { + Data(ResponseId id=0, RequestId req=0, BatchOffset off=0) + : responseId(id), requestId(req), batchOffset(off) {} + void encode(Buffer&) const; + void decode(Buffer&); + + u_int64_t responseId; + u_int64_t requestId; + u_int32_t batchOffset; + }; + static shared_ptr create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer); @@ -53,17 +64,6 @@ class AMQResponseBody : public AMQMethodBody protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } private: - struct Data { - Data(ResponseId id=0, RequestId req=0, BatchOffset off=0) - : responseId(id), requestId(req), batchOffset(off) {} - void encode(Buffer&) const; - void decode(Buffer&); - - u_int64_t responseId; - u_int64_t requestId; - u_int32_t batchOffset; - }; - Data data; }; diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp new file mode 100644 index 0000000000..1dd3cd4ce9 --- /dev/null +++ b/cpp/lib/common/framing/Requester.cpp @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Requester.h" +#include "QpidError.h" + +namespace qpid { +namespace framing { + +Requester::Requester() : lastId(0), responseMark(0) {} + +void Requester::sending(AMQRequestBody::Data& request) { + request.requestId = ++lastId; + request.responseMark = responseMark; + requests.insert(request.requestId); +} + +void Requester::processed(const AMQResponseBody::Data& response) { + responseMark = response.responseId; + RequestId id = response.requestId; + RequestId end = id + response.batchOffset; + for ( ; id < end; ++id) { + std::set<RequestId>::iterator i = requests.find(id); + if (i == requests.end()) + // TODO aconway 2007-01-12: Verify this is the right exception. + THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); + requests.erase(i); + } +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h new file mode 100644 index 0000000000..e24848f98a --- /dev/null +++ b/cpp/lib/common/framing/Requester.h @@ -0,0 +1,59 @@ +#ifndef _framing_Requester_h +#define _framing_Requester_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <set> +#include "AMQRequestBody.h" +#include "AMQResponseBody.h" + +namespace qpid { +namespace framing { + +class AMQRequestBody; +class AMQResponseBody; + +/** + * Manage request IDs and the response mark for locally initiated requests. + * + * THREAD UNSAFE: This class is called as frames are sent or received + * sequentially on a connection, so it does not need to be thread safe. + */ +class Requester +{ + public: + Requester(); + + /** Called before sending a request to set request data. */ + void sending(AMQRequestBody::Data&); + + /** Called after processing a response. */ + void processed(const AMQResponseBody::Data&); + + private: + std::set<RequestId> requests; /** Sent but not responded to */ + RequestId lastId; + ResponseId responseMark; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Requester_h*/ diff --git a/cpp/lib/common/framing/Responder.cpp b/cpp/lib/common/framing/Responder.cpp new file mode 100644 index 0000000000..efe3609c7b --- /dev/null +++ b/cpp/lib/common/framing/Responder.cpp @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Responder.h" +#include "QpidError.h" + +namespace qpid { +namespace framing { + +Responder::Responder() : lastId(0), responseMark(0) {} + +void Responder::received(const AMQRequestBody::Data& request) { + if (request.responseMark < responseMark || request.responseMark > lastId) + THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid resposne mark"); + responseMark = request.responseMark; +} + +void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) { + response.responseId = ++lastId; + response.requestId = toRequest; + response.batchOffset = 0; +} + +}} // namespace qpid::framing + diff --git a/cpp/lib/common/framing/Responder.h b/cpp/lib/common/framing/Responder.h new file mode 100644 index 0000000000..a11967acc1 --- /dev/null +++ b/cpp/lib/common/framing/Responder.h @@ -0,0 +1,61 @@ +#ifndef _framing_Responder_h +#define _framing_Responder_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQRequestBody.h" +#include "AMQResponseBody.h" + +namespace qpid { +namespace framing { + +/** + * Manage response ids and response mark remotely initianted requests. + * + * THREAD UNSAFE: This class is called as frames are sent or received + * sequentially on a connection, so it does not need to be thread safe. + */ +class Responder +{ + public: + Responder(); + + /** Called after receiving a request. */ + void received(const AMQRequestBody::Data& request); + + /** Called before sending a response to set respose data. */ + void sending(AMQResponseBody::Data& response, RequestId toRequest); + + /** Get the ID of the highest response acknowledged by the peer. */ + ResponseId getResponseMark() { return responseMark; } + + // TODO aconway 2007-01-14: Batching support - store unsent + // Response for equality comparison with subsequent responses. + // + + private: + ResponseId lastId; + ResponseId responseMark; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Responder_h*/ diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp index 5b90d9c288..dc895506c9 100644 --- a/cpp/tests/FramingTest.cpp +++ b/cpp/tests/FramingTest.cpp @@ -28,7 +28,8 @@ #include <AMQP_HighestVersion.h> #include "AMQRequestBody.h" #include "AMQResponseBody.h" - +#include "Requester.h" +#include "Responder.h" using namespace qpid::framing; @@ -52,6 +53,8 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_TEST(testBasicConsumeOkBodyFrame); CPPUNIT_TEST(testRequestBodyFrame); CPPUNIT_TEST(testResponseBodyFrame); + CPPUNIT_TEST(testRequester); + CPPUNIT_TEST(testResponder); CPPUNIT_TEST_SUITE_END(); private: @@ -171,6 +174,98 @@ class FramingTest : public CppUnit::TestCase dynamic_cast<ChannelOkBody*>(out.getBody().get()); CPPUNIT_ASSERT(decoded); } + + void testRequester() { + Requester r; + AMQRequestBody::Data q; + AMQResponseBody::Data p; + + r.sending(q); + CPPUNIT_ASSERT_EQUAL(1ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); + + r.sending(q); + CPPUNIT_ASSERT_EQUAL(2ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); + + // Now process a response + p.responseId = 1; + p.requestId = 2; + r.processed(AMQResponseBody::Data(1, 2)); + + r.sending(q); + CPPUNIT_ASSERT_EQUAL(3ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(1ULL, q.responseMark); + + try { + r.processed(p); // Already processed this response. + CPPUNIT_FAIL("Expected exception"); + } catch (...) {} + + try { + p.requestId = 50; + r.processed(p); // No such request + CPPUNIT_FAIL("Expected exception"); + } catch (...) {} + + r.sending(q); // reqId=4 + r.sending(q); // reqId=5 + r.sending(q); // reqId=6 + p.responseId++; + p.requestId = 4; + p.batchOffset = 2; + r.processed(p); + r.sending(q); + CPPUNIT_ASSERT_EQUAL(7ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(2ULL, q.responseMark); + + p.responseId++; + p.requestId = 1; // Out of order + p.batchOffset = 0; + r.processed(p); + r.sending(q); + CPPUNIT_ASSERT_EQUAL(8ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(3ULL, q.responseMark); + } + + void testResponder() { + Responder r; + AMQRequestBody::Data q; + AMQResponseBody::Data p; + + q.requestId = 1; + q.responseMark = 0; + r.received(q); + r.sending(p, q.requestId); + CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId); + CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId); + CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); + CPPUNIT_ASSERT_EQUAL(0ULL, r.getResponseMark()); + + q.requestId++; + q.responseMark = 1; + r.received(q); + r.sending(p, q.requestId); + CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId); + CPPUNIT_ASSERT_EQUAL(2ULL, p.requestId); + CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); + CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark()); + + try { + // Response mark higher any request ID sent. + q.responseMark = 3; + r.received(q); + } catch(...) {} + + try { + // Response mark lower than previous response mark. + q.responseMark = 0; + r.received(q); + } catch(...) {} + + // TODO aconway 2007-01-14: Test for batching when supported. + + } }; |