summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-09 00:52:46 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-09 00:52:46 +0000
commitf197f0c88e1f5ed37a14617b1006f6579c4319e7 (patch)
tree63481b59b90c60e69550da9b95d268f36e1bcc84 /cpp/lib
parent5611851b2094372ba5cb77c93ba475e95ce76437 (diff)
downloadqpid-python-f197f0c88e1f5ed37a14617b1006f6579c4319e7.tar.gz
r1102@fuschia: andrew | 2007-02-09 00:52:04 +0000
Got ack working for the non batched case Small tidy up in broker Channel git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@505108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp2
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp21
-rw-r--r--cpp/lib/broker/BrokerChannel.h6
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp19
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp9
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h1
-rw-r--r--cpp/lib/common/framing/Requester.h3
7 files changed, 36 insertions, 25 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 625dda1480..bdf41266ce 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -280,7 +280,7 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
BasicMessage* msg = new BasicMessage(
&connection, exchangeName, routingKey, mandatory, immediate,
context.methodBody);
- channel.handlePublish(msg, exchange);
+ channel.handlePublish(msg);
}else{
throw ChannelException(
404, "Exchange not found '" + exchangeName + "'");
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 84ac747846..e0c5eebbec 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -140,7 +140,9 @@ void Channel::deliver(
{
Mutex::ScopedLock locker(deliveryLock);
- u_int64_t deliveryTag = currentDeliveryTag++;
+ // Key the delivered messages to the id of the request in which they're sent
+ u_int64_t deliveryTag = getNextSendRequestId();
+
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
outstanding.size += msg->contentSize();
@@ -188,24 +190,24 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
-void Channel::handleInlineTransfer(
- Message::shared_ptr msg, Exchange::shared_ptr& exch)
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
{
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
- exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
- exch->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
}
}
// FIXME aconway 2007-02-05: Drop exchange member, calculate from
// message in ::complete().
-void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
+void Channel::handlePublish(Message* _message){
Message::shared_ptr message(_message);
- exchange = _exchange;
messageBuilder.initialise(message);
}
@@ -239,6 +241,11 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
+// TODO astitcher 2007-02-08 This only deals correctly with non batched responses
+void Channel::ack(){
+ ack(getRequestInProgress(), false);
+}
+
void Channel::ack(u_int64_t deliveryTag, bool multiple){
if(transactional){
accumulatedAck.update(deliveryTag, multiple);
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 6e906e7615..52d8a0abeb 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -91,7 +91,6 @@ class Channel : public framing::ChannelAdapter,
AccumulatedAck accumulatedAck;
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
- Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
bool opened;
boost::scoped_ptr<BrokerAdapter> adapter;
@@ -131,15 +130,16 @@ class Channel : public framing::ChannelAdapter,
void close();
void commit();
void rollback();
+ void ack();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
- void handlePublish(Message* msg, Exchange::shared_ptr exchange);
+ void handlePublish(Message* msg);
void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
- void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& exchange);
+ void handleInlineTransfer(Message::shared_ptr msg);
// For ChannelAdapter
void handleMethodInContext(
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 0d588aa351..7200027115 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -101,8 +101,9 @@ MessageHandlerImpl::consume(const MethodContext& context,
void
MessageHandlerImpl::empty( const MethodContext& )
{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+ // Shouldn't ever receive this as it is a response to get
+ // TODO astitcher 2007-02-09 What is the correct exception to throw here?
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
}
void
@@ -112,12 +113,9 @@ MessageHandlerImpl::get( const MethodContext& context,
const string& /*destination*/,
bool noAck )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channel->getId());
- // FIXME: get is probably Basic specific
if(channel.get(queue, !noAck))
client.ok(context);
else
@@ -133,9 +131,9 @@ MessageHandlerImpl::offset(const MethodContext&,
}
void
-MessageHandlerImpl::ok( const MethodContext& )
+MessageHandlerImpl::ok(const MethodContext& /*context*/)
{
- // TODO: Need to ack the transfers acknowledged so far for flow control purp oses
+ channel.ack();
}
void
@@ -162,7 +160,6 @@ void
MessageHandlerImpl::recover(const MethodContext& context,
bool requeue)
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented");
channel.recover(requeue);
client.ok(context);
}
@@ -188,7 +185,7 @@ MessageHandlerImpl::resume(const MethodContext&,
void
MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& destination,
+ const string& /* destination */,
bool /*redelivered*/,
bool /*immediate*/,
u_int64_t /*ttl*/,
@@ -211,8 +208,6 @@ MessageHandlerImpl::transfer(const MethodContext& context,
qpid::framing::Content body,
bool /*mandatory*/)
{
- Exchange::shared_ptr exchange(
- broker.getExchanges().get(destination));
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
@@ -220,7 +215,7 @@ MessageHandlerImpl::transfer(const MethodContext& context,
new MessageMessage(&connection, transfer));
if (body.isInline())
- channel.handleInlineTransfer(message, exchange);
+ channel.handleInlineTransfer(message);
else
references.get(body.getValue()).addMessage(message);
client.ok(context);
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 149c8144b4..53ab30faa0 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -55,7 +55,9 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) {
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
- responder.received(request->getData());
+ AMQRequestBody::Data& requestData = request->getData();
+ responder.received(requestData);
+ requestInProgress = requestData.requestId;
handleMethodInContext(request, MethodContext(this, request));
}
@@ -63,7 +65,10 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
assertMethodOk(*response);
// TODO aconway 2007-01-30: Consider a response handled on receipt.
// Review - any cases where this is not the case?
- requester.processed(response->getData());
+ AMQResponseBody::Data& responseData = response->getData();
+ requester.processed(responseData);
+ // For a response this is taken to be the request being responded to (for convenience)
+ requestInProgress = responseData.requestId;
handleMethod(response);
}
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index 26cac76aae..c2eba2f4e9 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -87,6 +87,7 @@ class ChannelAdapter : public BodyHandler {
const MethodContext& context) = 0;
RequestId getRequestInProgress() { return requestInProgress; }
+ RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
ChannelId id;
diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h
index 562ba681c1..dae5b1eaee 100644
--- a/cpp/lib/common/framing/Requester.h
+++ b/cpp/lib/common/framing/Requester.h
@@ -46,6 +46,9 @@ class Requester
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
+ /** Get the next id to be used. */
+ RequestId getNextId() { return lastId + 1; }
+
private:
std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;