diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-02-13 21:52:30 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-02-13 21:52:30 +0000 |
commit | 5d8e8d39e1e5e13d0753c53a8095f075895d01a1 (patch) | |
tree | 727d30e03ae1679827779560a32f11c12f32d4a5 | |
parent | 9517deedff9691dbe3429b0b917dfd4208b0b1b8 (diff) | |
download | qpid-python-5d8e8d39e1e5e13d0753c53a8095f075895d01a1.tar.gz |
r1111@fuschia: andrew | 2007-02-09 15:51:10 +0000
Removed currently unused request tracking logic
r1125@fuschia: andrew | 2007-02-13 21:51:30 +0000
Implemented receiveing batched Message.ok in c++ broker
Implemented batched response frames in python client code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507249 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/AccumulatedAck.cpp | 12 | ||||
-rw-r--r-- | cpp/lib/broker/AccumulatedAck.h | 2 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 29 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 3 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.cpp | 3 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.h | 4 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.cpp | 15 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.h | 9 | ||||
-rw-r--r-- | python/qpid/message.py | 9 | ||||
-rw-r--r-- | python/qpid/peer.py | 13 | ||||
-rw-r--r-- | python/tests/message.py | 12 |
12 files changed, 59 insertions, 58 deletions
diff --git a/cpp/lib/broker/AccumulatedAck.cpp b/cpp/lib/broker/AccumulatedAck.cpp index a9826ba5ea..2f2a7464c1 100644 --- a/cpp/lib/broker/AccumulatedAck.cpp +++ b/cpp/lib/broker/AccumulatedAck.cpp @@ -24,12 +24,12 @@ using std::less_equal; using std::bind2nd; using namespace qpid::broker; -void AccumulatedAck::update(u_int64_t tag, bool multiple){ - if(multiple){ - if(tag > range) range = tag; - //else don't care, it is already counted - }else if(tag > range){ - individual.push_back(tag); +void AccumulatedAck::update(u_int64_t firstTag, u_int64_t lastTag){ + if (firstTag-1 == range) { + range = lastTag; + } else { + for (u_int64_t tag = firstTag; tag<=lastTag; tag++) + individual.push_back(tag); } } diff --git a/cpp/lib/broker/AccumulatedAck.h b/cpp/lib/broker/AccumulatedAck.h index 055c8ea3e0..6ab0cfbe2e 100644 --- a/cpp/lib/broker/AccumulatedAck.h +++ b/cpp/lib/broker/AccumulatedAck.h @@ -43,7 +43,7 @@ namespace qpid { */ std::list<u_int64_t> individual; - void update(u_int64_t tag, bool multiple); + void update(u_int64_t firstTag, u_int64_t lastTag); void consolidate(); void clear(); bool covers(u_int64_t tag) const; diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index ec80241c66..49f0f24407 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -326,11 +326,7 @@ void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_ } void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ - try{ - channel.ack(deliveryTag, multiple); - }catch(InvalidAckException& e){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - } + channel.ack(deliveryTag, multiple); } void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 74e5504f17..674d0e9505 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -235,24 +235,37 @@ void Channel::complete(Message::shared_ptr msg) { // TODO astitcher 2007-02-08 This only deals correctly with non batched responses void Channel::ack(){ - ack(getRequestInProgress(), false); + ack(getFirstAckRequest(), getLastAckRequest()); } -void Channel::ack(u_int64_t deliveryTag, bool multiple) -{ +// Used by Basic +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + if (multiple) + ack(0, deliveryTag); + else + ack(deliveryTag, deliveryTag); +} + +void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){ if(transactional){ - accumulatedAck.update(deliveryTag, multiple); + //FIXME astitcher This only works for Basic style acks + accumulatedAck.update(lastTag, lastTag); + //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers }else{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); + ack_iterator j = (firstTag == 0) ? + unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); + if(i == unacked.end()){ - throw InvalidAckException(); - }else if(multiple){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + }else if(i!=j){ ack_iterator end = ++i; - for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); + for_each(j, end, mem_fun_ref(&DeliveryRecord::discard)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 538e86b0a8..1a1c4dabba 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -138,6 +138,7 @@ class Channel : public framing::ChannelAdapter, void rollback(); void ack(); void ack(u_int64_t deliveryTag, bool multiple); + void ack(u_int64_t deliveryTag, u_int64_t endTag); void recover(bool requeue); void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); void handlePublish(Message* msg); @@ -153,8 +154,6 @@ class Channel : public framing::ChannelAdapter, const framing::MethodContext& context); }; -struct InvalidAckException{}; - }} // namespace broker diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 40241660f2..8a1ff39ee5 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -61,7 +61,6 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { assertMethodOk(*request); AMQRequestBody::Data& requestData = request->getData(); responder.received(requestData); - requestInProgress = requestData.requestId; handleMethodInContext(request, MethodContext(this, request)); } @@ -71,8 +70,6 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { // Review - any cases where this is not the case? AMQResponseBody::Data& responseData = response->getData(); requester.processed(responseData); - // For a response this is taken to be the request being responded to (for convenience) - requestInProgress = responseData.requestId; handleMethod(response); } diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index 9f654d9a5b..36362a417a 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -85,7 +85,8 @@ class ChannelAdapter : public BodyHandler { boost::shared_ptr<qpid::framing::AMQMethodBody> method, const MethodContext& context) = 0; - RequestId getRequestInProgress() { return requestInProgress; } + RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } + RequestId getLastAckRequest() { return requester.getLastAckRequest(); } RequestId getNextSendRequestId() { return requester.getNextId(); } private: @@ -94,7 +95,6 @@ class ChannelAdapter : public BodyHandler { ProtocolVersion version; Requester requester; Responder responder; - RequestId requestInProgress; }; }} diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp index 37b2d37c86..9ee809e2ee 100644 --- a/cpp/lib/common/framing/Requester.cpp +++ b/cpp/lib/common/framing/Requester.cpp @@ -29,23 +29,12 @@ Requester::Requester() : lastId(0), responseMark(0) {} void Requester::sending(AMQRequestBody::Data& request) { request.requestId = ++lastId; request.responseMark = responseMark; - requests.insert(request.requestId); } void Requester::processed(const AMQResponseBody::Data& response) { responseMark = response.responseId; - RequestId id = response.requestId; - RequestId end = id + response.batchOffset + 1; - for ( ; id < end; ++id) { - std::set<RequestId>::iterator i = requests.find(id); - if (i != requests.end()) - requests.erase(i); - else { - THROW_QPID_ERROR( - PROTOCOL_ERROR, - boost::format("Response with non-existent request id=%d")%id); - } - } + firstAckRequest = response.requestId; + lastAckRequest = firstAckRequest + response.batchOffset; } }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h index dae5b1eaee..dcc4460041 100644 --- a/cpp/lib/common/framing/Requester.h +++ b/cpp/lib/common/framing/Requester.h @@ -46,13 +46,18 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); - /** Get the next id to be used. */ + /** Get the next request id to be used. */ RequestId getNextId() { return lastId + 1; } + /** Get the first request acked by this response */ + RequestId getFirstAckRequest() { return firstAckRequest; } + /** Get the last request acked by this response */ + RequestId getLastAckRequest() { return lastAckRequest; } private: - std::set<RequestId> requests; /** Sent but not responded to */ RequestId lastId; ResponseId responseMark; + ResponseId firstAckRequest; + ResponseId lastAckRequest; }; }} // namespace qpid::framing diff --git a/python/qpid/message.py b/python/qpid/message.py index 29c8654937..f80293180e 100644 --- a/python/qpid/message.py +++ b/python/qpid/message.py @@ -47,9 +47,12 @@ class Message: else: for r in self.method.responses: if attr == r.name: - result = lambda *args, **kwargs: \ - self.channel.respond(Method(r, r.arguments(*args, **kwargs)), - self.frame) + def respond(*args, **kwargs): + batch=0 + if kwargs.has_key("batchoffset"): + batch=kwargs.pop("batchoffset") + self.channel.respond(Method(r, r.arguments(*args, **kwargs)), batch, self.frame) + result = respond break else: raise AttributeError(attr) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 9f9644f17d..3f7be699c2 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -155,12 +155,15 @@ class Responder: self.write = writer self.sequence = Sequence(1) - def respond(self, method, request): + def respond(self, method, batch, request): if isinstance(request, Method): self.write(method) else: - # XXX: batching - frame = Response(self.sequence.next(), request.id, 0, method) + # allow batching from frame at either end + if batch<0: + frame = Response(self.sequence.next(), request.id+batch, -batch, method) + else: + frame = Response(self.sequence.next(), request.id, batch, method) self.write(frame) class Closed(Exception): pass @@ -237,8 +240,8 @@ class Channel: def request(self, method, listener, content = None): self.requester.request(method, listener, content) - def respond(self, method, request): - self.responder.respond(method, request) + def respond(self, method, batch, request): + self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): content = kwargs.pop("content", None) diff --git a/python/tests/message.py b/python/tests/message.py index d044d638e7..84219bfe25 100644 --- a/python/tests/message.py +++ b/python/tests/message.py @@ -384,16 +384,12 @@ class MessageTests(TestBase): self.assertEqual(reply.method.name, "ok") msg = self.client.queue(tag).get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - # TODO: replace with below when we have batching - if(i in [11, 12, 13, 15, 17, 19]): + + if (i==13): + msg.ok(batchoffset=-2) + if(i in [15, 17, 19]): msg.ok() - #todo: when batching is available, test ack multiple - #if(i == 13): - # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True) - #if(i in [15, 17, 19]): - # channel.message_ack(delivery_tag=reply.delivery_tag) - reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "empty") |