diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-02-09 02:16:03 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-02-09 02:16:03 +0000 |
commit | 8cf4a0b8ebaa6075a1f083a294b1fee90bd4d196 (patch) | |
tree | 77670233364cef083ef40b9878297a64c16a160b /cpp | |
parent | f197f0c88e1f5ed37a14617b1006f6579c4319e7 (diff) | |
download | qpid-python-8cf4a0b8ebaa6075a1f083a294b1fee90bd4d196.tar.gz |
r1104@fuschia: andrew | 2007-02-09 02:14:00 +0000
Initial implementation of Message.get delivery
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@505139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 7 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 2 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 3 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 30 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 4 |
9 files changed, 40 insertions, 11 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index bdf41266ce..012ec052e8 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -289,7 +289,7 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ + if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack connection.client->getBasic().getEmpty(context, clusterId); diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index e0c5eebbec..cae48273ff 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -204,8 +204,6 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg) } } -// FIXME aconway 2007-02-05: Drop exchange member, calculate from -// message in ::complete(). void Channel::handlePublish(Message* _message){ Message::shared_ptr message(_message); messageBuilder.initialise(message); @@ -292,12 +290,13 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ +bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - u_int64_t myDeliveryTag = currentDeliveryTag++; + u_int64_t myDeliveryTag = getNextSendRequestId(); msg->sendGetOk(MethodContext(this, msg->getRespondTo()), + destination, queue->getMessageCount() + 1, myDeliveryTag, framesize); if(ackExpected){ diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 52d8a0abeb..8ad5d691e6 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -125,7 +125,7 @@ class Channel : public framing::ChannelAdapter, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(Queue::shared_ptr queue, bool ackExpected); + bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); void begin(); void close(); void commit(); diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 76417056cb..e8a993942a 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -89,7 +89,8 @@ void BasicMessage::deliver(ChannelAdapter& channel, sendContent(channel, framesize); } -void BasicMessage::sendGetOk(const MethodContext& context, +void BasicMessage::sendGetOk(const MethodContext& context, + const std::string& /*destination*/, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize) diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 308fcc1791..9e77eab446 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -78,6 +78,7 @@ class BasicMessage : public Message { u_int32_t framesize); void sendGetOk(const framing::MethodContext&, + const std::string& destination, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 32767191ca..41a0cd45fa 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -112,6 +112,7 @@ class Message{ * Used to return a message in response to a get from a queue */ virtual void sendGetOk(const framing::MethodContext& context, + const std::string& destination, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize) = 0; diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 153acb9ab5..7a5943bc66 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -79,12 +79,38 @@ void MessageMessage::deliver( } void MessageMessage::sendGetOk( - const framing::MethodContext& /*context*/, + const framing::MethodContext& context, + const std::string& destination, u_int32_t /*messageCount*/, u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { - // FIXME aconway 2007-02-05: + framing::ChannelAdapter* channel = context.channel; + channel->send( + new MessageTransferBody(channel->getVersion(), + transfer->getTicket(), + destination, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + transfer->getBody(), + transfer->getMandatory())); } bool MessageMessage::isComplete() diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index fb0a4749d4..89289a7fd0 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -60,6 +60,7 @@ class MessageMessage: public Message{ u_int32_t framesize); void sendGetOk(const framing::MethodContext& context, + const std::string& destination, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize); diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 7200027115..797e3fbbf9 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -110,13 +110,13 @@ void MessageHandlerImpl::get( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& /*destination*/, + const string& destination, bool noAck ) { Queue::shared_ptr queue = connection.getQueue(queueName, context.channel->getId()); - if(channel.get(queue, !noAck)) + if(channel.get(queue, destination, !noAck)) client.ok(context); else client.empty(context); |