diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-02-09 00:52:46 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-02-09 00:52:46 +0000 |
commit | f197f0c88e1f5ed37a14617b1006f6579c4319e7 (patch) | |
tree | 63481b59b90c60e69550da9b95d268f36e1bcc84 /cpp/lib | |
parent | 5611851b2094372ba5cb77c93ba475e95ce76437 (diff) | |
download | qpid-python-f197f0c88e1f5ed37a14617b1006f6579c4319e7.tar.gz |
r1102@fuschia: andrew | 2007-02-09 00:52:04 +0000
Got ack working for the non batched case
Small tidy up in broker Channel
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@505108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 21 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 6 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 19 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/common/framing/ChannelAdapter.h | 1 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.h | 3 |
7 files changed, 36 insertions, 25 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 625dda1480..bdf41266ce 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -280,7 +280,7 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( BasicMessage* msg = new BasicMessage( &connection, exchangeName, routingKey, mandatory, immediate, context.methodBody); - channel.handlePublish(msg, exchange); + channel.handlePublish(msg); }else{ throw ChannelException( 404, "Exchange not found '" + exchangeName + "'"); diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 84ac747846..e0c5eebbec 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -140,7 +140,9 @@ void Channel::deliver( { Mutex::ScopedLock locker(deliveryLock); - u_int64_t deliveryTag = currentDeliveryTag++; + // Key the delivered messages to the id of the request in which they're sent + u_int64_t deliveryTag = getNextSendRequestId(); + if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); outstanding.size += msg->contentSize(); @@ -188,24 +190,24 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } -void Channel::handleInlineTransfer( - Message::shared_ptr msg, Exchange::shared_ptr& exch) +void Channel::handleInlineTransfer(Message::shared_ptr msg) { + Exchange::shared_ptr exchange = + connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exch->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); } } // FIXME aconway 2007-02-05: Drop exchange member, calculate from // message in ::complete(). -void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ +void Channel::handlePublish(Message* _message){ Message::shared_ptr message(_message); - exchange = _exchange; messageBuilder.initialise(message); } @@ -239,6 +241,11 @@ void Channel::complete(Message::shared_ptr msg) { } } +// TODO astitcher 2007-02-08 This only deals correctly with non batched responses +void Channel::ack(){ + ack(getRequestInProgress(), false); +} + void Channel::ack(u_int64_t deliveryTag, bool multiple){ if(transactional){ accumulatedAck.update(deliveryTag, multiple); diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 6e906e7615..52d8a0abeb 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -91,7 +91,6 @@ class Channel : public framing::ChannelAdapter, AccumulatedAck accumulatedAck; MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message - Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to bool opened; boost::scoped_ptr<BrokerAdapter> adapter; @@ -131,15 +130,16 @@ class Channel : public framing::ChannelAdapter, void close(); void commit(); void rollback(); + void ack(); void ack(u_int64_t deliveryTag, bool multiple); void recover(bool requeue); void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); - void handlePublish(Message* msg, Exchange::shared_ptr exchange); + void handlePublish(Message* msg); void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); void handleContent(boost::shared_ptr<framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); - void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& exchange); + void handleInlineTransfer(Message::shared_ptr msg); // For ChannelAdapter void handleMethodInContext( diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 0d588aa351..7200027115 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -101,8 +101,9 @@ MessageHandlerImpl::consume(const MethodContext& context, void MessageHandlerImpl::empty( const MethodContext& ) { - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); + // Shouldn't ever receive this as it is a response to get + // TODO astitcher 2007-02-09 What is the correct exception to throw here? + THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); } void @@ -112,12 +113,9 @@ MessageHandlerImpl::get( const MethodContext& context, const string& /*destination*/, bool noAck ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - Queue::shared_ptr queue = connection.getQueue(queueName, context.channel->getId()); - // FIXME: get is probably Basic specific if(channel.get(queue, !noAck)) client.ok(context); else @@ -133,9 +131,9 @@ MessageHandlerImpl::offset(const MethodContext&, } void -MessageHandlerImpl::ok( const MethodContext& ) +MessageHandlerImpl::ok(const MethodContext& /*context*/) { - // TODO: Need to ack the transfers acknowledged so far for flow control purp oses + channel.ack(); } void @@ -162,7 +160,6 @@ void MessageHandlerImpl::recover(const MethodContext& context, bool requeue) { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented"); channel.recover(requeue); client.ok(context); } @@ -188,7 +185,7 @@ MessageHandlerImpl::resume(const MethodContext&, void MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, - const string& destination, + const string& /* destination */, bool /*redelivered*/, bool /*immediate*/, u_int64_t /*ttl*/, @@ -211,8 +208,6 @@ MessageHandlerImpl::transfer(const MethodContext& context, qpid::framing::Content body, bool /*mandatory*/) { - Exchange::shared_ptr exchange( - broker.getExchanges().get(destination)); MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); @@ -220,7 +215,7 @@ MessageHandlerImpl::transfer(const MethodContext& context, new MessageMessage(&connection, transfer)); if (body.isInline()) - channel.handleInlineTransfer(message, exchange); + channel.handleInlineTransfer(message); else references.get(body.getValue()).addMessage(message); client.ok(context); diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 149c8144b4..53ab30faa0 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -55,7 +55,9 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) { void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { assertMethodOk(*request); - responder.received(request->getData()); + AMQRequestBody::Data& requestData = request->getData(); + responder.received(requestData); + requestInProgress = requestData.requestId; handleMethodInContext(request, MethodContext(this, request)); } @@ -63,7 +65,10 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { assertMethodOk(*response); // TODO aconway 2007-01-30: Consider a response handled on receipt. // Review - any cases where this is not the case? - requester.processed(response->getData()); + AMQResponseBody::Data& responseData = response->getData(); + requester.processed(responseData); + // For a response this is taken to be the request being responded to (for convenience) + requestInProgress = responseData.requestId; handleMethod(response); } diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index 26cac76aae..c2eba2f4e9 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -87,6 +87,7 @@ class ChannelAdapter : public BodyHandler { const MethodContext& context) = 0; RequestId getRequestInProgress() { return requestInProgress; } + RequestId getNextSendRequestId() { return requester.getNextId(); } private: ChannelId id; diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h index 562ba681c1..dae5b1eaee 100644 --- a/cpp/lib/common/framing/Requester.h +++ b/cpp/lib/common/framing/Requester.h @@ -46,6 +46,9 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); + /** Get the next id to be used. */ + RequestId getNextId() { return lastId + 1; } + private: std::set<RequestId> requests; /** Sent but not responded to */ RequestId lastId; |