summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-13 21:52:30 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-13 21:52:30 +0000
commit5d8e8d39e1e5e13d0753c53a8095f075895d01a1 (patch)
tree727d30e03ae1679827779560a32f11c12f32d4a5
parent9517deedff9691dbe3429b0b917dfd4208b0b1b8 (diff)
downloadqpid-python-5d8e8d39e1e5e13d0753c53a8095f075895d01a1.tar.gz
r1111@fuschia: andrew | 2007-02-09 15:51:10 +0000
Removed currently unused request tracking logic r1125@fuschia: andrew | 2007-02-13 21:51:30 +0000 Implemented receiveing batched Message.ok in c++ broker Implemented batched response frames in python client code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507249 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/AccumulatedAck.cpp12
-rw-r--r--cpp/lib/broker/AccumulatedAck.h2
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp6
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp29
-rw-r--r--cpp/lib/broker/BrokerChannel.h3
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp3
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h4
-rw-r--r--cpp/lib/common/framing/Requester.cpp15
-rw-r--r--cpp/lib/common/framing/Requester.h9
-rw-r--r--python/qpid/message.py9
-rw-r--r--python/qpid/peer.py13
-rw-r--r--python/tests/message.py12
12 files changed, 59 insertions, 58 deletions
diff --git a/cpp/lib/broker/AccumulatedAck.cpp b/cpp/lib/broker/AccumulatedAck.cpp
index a9826ba5ea..2f2a7464c1 100644
--- a/cpp/lib/broker/AccumulatedAck.cpp
+++ b/cpp/lib/broker/AccumulatedAck.cpp
@@ -24,12 +24,12 @@ using std::less_equal;
using std::bind2nd;
using namespace qpid::broker;
-void AccumulatedAck::update(u_int64_t tag, bool multiple){
- if(multiple){
- if(tag > range) range = tag;
- //else don't care, it is already counted
- }else if(tag > range){
- individual.push_back(tag);
+void AccumulatedAck::update(u_int64_t firstTag, u_int64_t lastTag){
+ if (firstTag-1 == range) {
+ range = lastTag;
+ } else {
+ for (u_int64_t tag = firstTag; tag<=lastTag; tag++)
+ individual.push_back(tag);
}
}
diff --git a/cpp/lib/broker/AccumulatedAck.h b/cpp/lib/broker/AccumulatedAck.h
index 055c8ea3e0..6ab0cfbe2e 100644
--- a/cpp/lib/broker/AccumulatedAck.h
+++ b/cpp/lib/broker/AccumulatedAck.h
@@ -43,7 +43,7 @@ namespace qpid {
*/
std::list<u_int64_t> individual;
- void update(u_int64_t tag, bool multiple);
+ void update(u_int64_t firstTag, u_int64_t lastTag);
void consolidate();
void clear();
bool covers(u_int64_t tag) const;
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index ec80241c66..49f0f24407 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -326,11 +326,7 @@ void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_
}
void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
- try{
- channel.ack(deliveryTag, multiple);
- }catch(InvalidAckException& e){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }
+ channel.ack(deliveryTag, multiple);
}
void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 74e5504f17..674d0e9505 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -235,24 +235,37 @@ void Channel::complete(Message::shared_ptr msg) {
// TODO astitcher 2007-02-08 This only deals correctly with non batched responses
void Channel::ack(){
- ack(getRequestInProgress(), false);
+ ack(getFirstAckRequest(), getLastAckRequest());
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple)
-{
+// Used by Basic
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){
if(transactional){
- accumulatedAck.update(deliveryTag, multiple);
+ //FIXME astitcher This only works for Basic style acks
+ accumulatedAck.update(lastTag, lastTag);
+
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
}else{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+
if(i == unacked.end()){
- throw InvalidAckException();
- }else if(multiple){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }else if(i!=j){
ack_iterator end = ++i;
- for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 538e86b0a8..1a1c4dabba 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -138,6 +138,7 @@ class Channel : public framing::ChannelAdapter,
void rollback();
void ack();
void ack(u_int64_t deliveryTag, bool multiple);
+ void ack(u_int64_t deliveryTag, u_int64_t endTag);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
void handlePublish(Message* msg);
@@ -153,8 +154,6 @@ class Channel : public framing::ChannelAdapter,
const framing::MethodContext& context);
};
-struct InvalidAckException{};
-
}} // namespace broker
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 40241660f2..8a1ff39ee5 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -61,7 +61,6 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
AMQRequestBody::Data& requestData = request->getData();
responder.received(requestData);
- requestInProgress = requestData.requestId;
handleMethodInContext(request, MethodContext(this, request));
}
@@ -71,8 +70,6 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
// Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
requester.processed(responseData);
- // For a response this is taken to be the request being responded to (for convenience)
- requestInProgress = responseData.requestId;
handleMethod(response);
}
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index 9f654d9a5b..36362a417a 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -85,7 +85,8 @@ class ChannelAdapter : public BodyHandler {
boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const MethodContext& context) = 0;
- RequestId getRequestInProgress() { return requestInProgress; }
+ RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+ RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
@@ -94,7 +95,6 @@ class ChannelAdapter : public BodyHandler {
ProtocolVersion version;
Requester requester;
Responder responder;
- RequestId requestInProgress;
};
}}
diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp
index 37b2d37c86..9ee809e2ee 100644
--- a/cpp/lib/common/framing/Requester.cpp
+++ b/cpp/lib/common/framing/Requester.cpp
@@ -29,23 +29,12 @@ Requester::Requester() : lastId(0), responseMark(0) {}
void Requester::sending(AMQRequestBody::Data& request) {
request.requestId = ++lastId;
request.responseMark = responseMark;
- requests.insert(request.requestId);
}
void Requester::processed(const AMQResponseBody::Data& response) {
responseMark = response.responseId;
- RequestId id = response.requestId;
- RequestId end = id + response.batchOffset + 1;
- for ( ; id < end; ++id) {
- std::set<RequestId>::iterator i = requests.find(id);
- if (i != requests.end())
- requests.erase(i);
- else {
- THROW_QPID_ERROR(
- PROTOCOL_ERROR,
- boost::format("Response with non-existent request id=%d")%id);
- }
- }
+ firstAckRequest = response.requestId;
+ lastAckRequest = firstAckRequest + response.batchOffset;
}
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h
index dae5b1eaee..dcc4460041 100644
--- a/cpp/lib/common/framing/Requester.h
+++ b/cpp/lib/common/framing/Requester.h
@@ -46,13 +46,18 @@ class Requester
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
- /** Get the next id to be used. */
+ /** Get the next request id to be used. */
RequestId getNextId() { return lastId + 1; }
+ /** Get the first request acked by this response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+ /** Get the last request acked by this response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
private:
- std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;
ResponseId responseMark;
+ ResponseId firstAckRequest;
+ ResponseId lastAckRequest;
};
}} // namespace qpid::framing
diff --git a/python/qpid/message.py b/python/qpid/message.py
index 29c8654937..f80293180e 100644
--- a/python/qpid/message.py
+++ b/python/qpid/message.py
@@ -47,9 +47,12 @@ class Message:
else:
for r in self.method.responses:
if attr == r.name:
- result = lambda *args, **kwargs: \
- self.channel.respond(Method(r, r.arguments(*args, **kwargs)),
- self.frame)
+ def respond(*args, **kwargs):
+ batch=0
+ if kwargs.has_key("batchoffset"):
+ batch=kwargs.pop("batchoffset")
+ self.channel.respond(Method(r, r.arguments(*args, **kwargs)), batch, self.frame)
+ result = respond
break
else:
raise AttributeError(attr)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 9f9644f17d..3f7be699c2 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -155,12 +155,15 @@ class Responder:
self.write = writer
self.sequence = Sequence(1)
- def respond(self, method, request):
+ def respond(self, method, batch, request):
if isinstance(request, Method):
self.write(method)
else:
- # XXX: batching
- frame = Response(self.sequence.next(), request.id, 0, method)
+ # allow batching from frame at either end
+ if batch<0:
+ frame = Response(self.sequence.next(), request.id+batch, -batch, method)
+ else:
+ frame = Response(self.sequence.next(), request.id, batch, method)
self.write(frame)
class Closed(Exception): pass
@@ -237,8 +240,8 @@ class Channel:
def request(self, method, listener, content = None):
self.requester.request(method, listener, content)
- def respond(self, method, request):
- self.responder.respond(method, request)
+ def respond(self, method, batch, request):
+ self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
content = kwargs.pop("content", None)
diff --git a/python/tests/message.py b/python/tests/message.py
index d044d638e7..84219bfe25 100644
--- a/python/tests/message.py
+++ b/python/tests/message.py
@@ -384,16 +384,12 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- # TODO: replace with below when we have batching
- if(i in [11, 12, 13, 15, 17, 19]):
+
+ if (i==13):
+ msg.ok(batchoffset=-2)
+ if(i in [15, 17, 19]):
msg.ok()
- #todo: when batching is available, test ack multiple
- #if(i == 13):
- # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
- #if(i in [15, 17, 19]):
- # channel.message_ack(delivery_tag=reply.delivery_tag)
-
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "empty")