summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-14 18:30:25 +0000
committerAlan Conway <aconway@apache.org>2007-01-14 18:30:25 +0000
commit972d4b29d086c803ffc4bee08c8c8eb0f2e788af (patch)
treebe8f8780e1d88a527bff688acdc2851bcaa9da8a /cpp
parent7abe972eb67e965bc1ec8d171c4dadba2db5afae (diff)
downloadqpid-python-972d4b29d086c803ffc4bee08c8c8eb0f2e788af.tar.gz
* Added Requester/Responder classes to manage request-ids, response-ids,
and response-mark. Response batches not yet supported. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496110 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp3
-rw-r--r--cpp/lib/common/Makefile.am2
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.h22
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h22
-rw-r--r--cpp/lib/common/framing/Requester.cpp46
-rw-r--r--cpp/lib/common/framing/Requester.h59
-rw-r--r--cpp/lib/common/framing/Responder.cpp40
-rw-r--r--cpp/lib/common/framing/Responder.h61
-rw-r--r--cpp/tests/FramingTest.cpp97
9 files changed, 327 insertions, 25 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index adf2758eda..7a44d9f872 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -224,7 +224,6 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin
parent->framemax, parent->queues->getStore(),
parent->settings.stagingThreshold);
- // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
parent->client->getChannel().openOk(channel, std::string()/* ID */);
}
@@ -279,7 +278,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind(
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index 941b30f8e5..541145ac97 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -72,6 +72,8 @@ libqpidcommon_la_SOURCES = \
$(framing)/ProtocolInitiation.cpp \
$(framing)/ProtocolVersion.cpp \
$(framing)/ProtocolVersionException.cpp \
+ $(framing)/Requester.cpp \
+ $(framing)/Responder.cpp \
$(framing)/Value.cpp \
$(gen)/AMQP_ClientProxy.cpp \
$(gen)/AMQP_HighestVersion.h \
diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h
index 1836a7d49d..74aa398606 100644
--- a/cpp/lib/common/framing/AMQRequestBody.h
+++ b/cpp/lib/common/framing/AMQRequestBody.h
@@ -31,7 +31,17 @@ class AMQRequestBody : public AMQMethodBody
{
public:
typedef boost::shared_ptr<AMQRequestBody> shared_ptr;
-
+
+ struct Data {
+ Data(RequestId id=0, ResponseId mark=0)
+ : requestId(id), responseMark(mark) {}
+ void encode(Buffer&) const;
+ void decode(Buffer&);
+
+ RequestId requestId;
+ ResponseId responseMark;
+ };
+
static shared_ptr create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer);
@@ -51,16 +61,6 @@ class AMQRequestBody : public AMQMethodBody
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; }
private:
- struct Data {
- Data(RequestId id=0, ResponseId mark=0)
- : requestId(id), responseMark(mark) {}
- void encode(Buffer&) const;
- void decode(Buffer&);
-
- RequestId requestId;
- ResponseId responseMark;
- };
-
Data data;
};
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
index bfd6cb2b9a..d7095d3da0 100644
--- a/cpp/lib/common/framing/AMQResponseBody.h
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -35,6 +35,17 @@ class AMQResponseBody : public AMQMethodBody
public:
typedef boost::shared_ptr<AMQResponseBody> shared_ptr;
+ struct Data {
+ Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : responseId(id), requestId(req), batchOffset(off) {}
+ void encode(Buffer&) const;
+ void decode(Buffer&);
+
+ u_int64_t responseId;
+ u_int64_t requestId;
+ u_int32_t batchOffset;
+ };
+
static shared_ptr create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer);
@@ -53,17 +64,6 @@ class AMQResponseBody : public AMQMethodBody
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
private:
- struct Data {
- Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
- : responseId(id), requestId(req), batchOffset(off) {}
- void encode(Buffer&) const;
- void decode(Buffer&);
-
- u_int64_t responseId;
- u_int64_t requestId;
- u_int32_t batchOffset;
- };
-
Data data;
};
diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp
new file mode 100644
index 0000000000..1dd3cd4ce9
--- /dev/null
+++ b/cpp/lib/common/framing/Requester.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Requester.h"
+#include "QpidError.h"
+
+namespace qpid {
+namespace framing {
+
+Requester::Requester() : lastId(0), responseMark(0) {}
+
+void Requester::sending(AMQRequestBody::Data& request) {
+ request.requestId = ++lastId;
+ request.responseMark = responseMark;
+ requests.insert(request.requestId);
+}
+
+void Requester::processed(const AMQResponseBody::Data& response) {
+ responseMark = response.responseId;
+ RequestId id = response.requestId;
+ RequestId end = id + response.batchOffset;
+ for ( ; id < end; ++id) {
+ std::set<RequestId>::iterator i = requests.find(id);
+ if (i == requests.end())
+ // TODO aconway 2007-01-12: Verify this is the right exception.
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response.");
+ requests.erase(i);
+ }
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h
new file mode 100644
index 0000000000..e24848f98a
--- /dev/null
+++ b/cpp/lib/common/framing/Requester.h
@@ -0,0 +1,59 @@
+#ifndef _framing_Requester_h
+#define _framing_Requester_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <set>
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQRequestBody;
+class AMQResponseBody;
+
+/**
+ * Manage request IDs and the response mark for locally initiated requests.
+ *
+ * THREAD UNSAFE: This class is called as frames are sent or received
+ * sequentially on a connection, so it does not need to be thread safe.
+ */
+class Requester
+{
+ public:
+ Requester();
+
+ /** Called before sending a request to set request data. */
+ void sending(AMQRequestBody::Data&);
+
+ /** Called after processing a response. */
+ void processed(const AMQResponseBody::Data&);
+
+ private:
+ std::set<RequestId> requests; /** Sent but not responded to */
+ RequestId lastId;
+ ResponseId responseMark;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Requester_h*/
diff --git a/cpp/lib/common/framing/Responder.cpp b/cpp/lib/common/framing/Responder.cpp
new file mode 100644
index 0000000000..efe3609c7b
--- /dev/null
+++ b/cpp/lib/common/framing/Responder.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Responder.h"
+#include "QpidError.h"
+
+namespace qpid {
+namespace framing {
+
+Responder::Responder() : lastId(0), responseMark(0) {}
+
+void Responder::received(const AMQRequestBody::Data& request) {
+ if (request.responseMark < responseMark || request.responseMark > lastId)
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid resposne mark");
+ responseMark = request.responseMark;
+}
+
+void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) {
+ response.responseId = ++lastId;
+ response.requestId = toRequest;
+ response.batchOffset = 0;
+}
+
+}} // namespace qpid::framing
+
diff --git a/cpp/lib/common/framing/Responder.h b/cpp/lib/common/framing/Responder.h
new file mode 100644
index 0000000000..a11967acc1
--- /dev/null
+++ b/cpp/lib/common/framing/Responder.h
@@ -0,0 +1,61 @@
+#ifndef _framing_Responder_h
+#define _framing_Responder_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Manage response ids and response mark remotely initianted requests.
+ *
+ * THREAD UNSAFE: This class is called as frames are sent or received
+ * sequentially on a connection, so it does not need to be thread safe.
+ */
+class Responder
+{
+ public:
+ Responder();
+
+ /** Called after receiving a request. */
+ void received(const AMQRequestBody::Data& request);
+
+ /** Called before sending a response to set respose data. */
+ void sending(AMQResponseBody::Data& response, RequestId toRequest);
+
+ /** Get the ID of the highest response acknowledged by the peer. */
+ ResponseId getResponseMark() { return responseMark; }
+
+ // TODO aconway 2007-01-14: Batching support - store unsent
+ // Response for equality comparison with subsequent responses.
+ //
+
+ private:
+ ResponseId lastId;
+ ResponseId responseMark;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Responder_h*/
diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp
index 5b90d9c288..dc895506c9 100644
--- a/cpp/tests/FramingTest.cpp
+++ b/cpp/tests/FramingTest.cpp
@@ -28,7 +28,8 @@
#include <AMQP_HighestVersion.h>
#include "AMQRequestBody.h"
#include "AMQResponseBody.h"
-
+#include "Requester.h"
+#include "Responder.h"
using namespace qpid::framing;
@@ -52,6 +53,8 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST(testBasicConsumeOkBodyFrame);
CPPUNIT_TEST(testRequestBodyFrame);
CPPUNIT_TEST(testResponseBodyFrame);
+ CPPUNIT_TEST(testRequester);
+ CPPUNIT_TEST(testResponder);
CPPUNIT_TEST_SUITE_END();
private:
@@ -171,6 +174,98 @@ class FramingTest : public CppUnit::TestCase
dynamic_cast<ChannelOkBody*>(out.getBody().get());
CPPUNIT_ASSERT(decoded);
}
+
+ void testRequester() {
+ Requester r;
+ AMQRequestBody::Data q;
+ AMQResponseBody::Data p;
+
+ r.sending(q);
+ CPPUNIT_ASSERT_EQUAL(1ULL, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark);
+
+ r.sending(q);
+ CPPUNIT_ASSERT_EQUAL(2ULL, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark);
+
+ // Now process a response
+ p.responseId = 1;
+ p.requestId = 2;
+ r.processed(AMQResponseBody::Data(1, 2));
+
+ r.sending(q);
+ CPPUNIT_ASSERT_EQUAL(3ULL, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(1ULL, q.responseMark);
+
+ try {
+ r.processed(p); // Already processed this response.
+ CPPUNIT_FAIL("Expected exception");
+ } catch (...) {}
+
+ try {
+ p.requestId = 50;
+ r.processed(p); // No such request
+ CPPUNIT_FAIL("Expected exception");
+ } catch (...) {}
+
+ r.sending(q); // reqId=4
+ r.sending(q); // reqId=5
+ r.sending(q); // reqId=6
+ p.responseId++;
+ p.requestId = 4;
+ p.batchOffset = 2;
+ r.processed(p);
+ r.sending(q);
+ CPPUNIT_ASSERT_EQUAL(7ULL, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(2ULL, q.responseMark);
+
+ p.responseId++;
+ p.requestId = 1; // Out of order
+ p.batchOffset = 0;
+ r.processed(p);
+ r.sending(q);
+ CPPUNIT_ASSERT_EQUAL(8ULL, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(3ULL, q.responseMark);
+ }
+
+ void testResponder() {
+ Responder r;
+ AMQRequestBody::Data q;
+ AMQResponseBody::Data p;
+
+ q.requestId = 1;
+ q.responseMark = 0;
+ r.received(q);
+ r.sending(p, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId);
+ CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId);
+ CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset);
+ CPPUNIT_ASSERT_EQUAL(0ULL, r.getResponseMark());
+
+ q.requestId++;
+ q.responseMark = 1;
+ r.received(q);
+ r.sending(p, q.requestId);
+ CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId);
+ CPPUNIT_ASSERT_EQUAL(2ULL, p.requestId);
+ CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset);
+ CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark());
+
+ try {
+ // Response mark higher any request ID sent.
+ q.responseMark = 3;
+ r.received(q);
+ } catch(...) {}
+
+ try {
+ // Response mark lower than previous response mark.
+ q.responseMark = 0;
+ r.received(q);
+ } catch(...) {}
+
+ // TODO aconway 2007-01-14: Test for batching when supported.
+
+ }
};