summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-09 02:16:03 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-09 02:16:03 +0000
commit8cf4a0b8ebaa6075a1f083a294b1fee90bd4d196 (patch)
tree77670233364cef083ef40b9878297a64c16a160b /cpp
parentf197f0c88e1f5ed37a14617b1006f6579c4319e7 (diff)
downloadqpid-python-8cf4a0b8ebaa6075a1f083a294b1fee90bd4d196.tar.gz
r1104@fuschia: andrew | 2007-02-09 02:14:00 +0000
Initial implementation of Message.get delivery git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@505139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp2
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp7
-rw-r--r--cpp/lib/broker/BrokerChannel.h2
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp3
-rw-r--r--cpp/lib/broker/BrokerMessage.h1
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h1
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp30
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h1
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp4
9 files changed, 40 insertions, 11 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index bdf41266ce..012ec052e8 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -289,7 +289,7 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
+ if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
connection.client->getBasic().getEmpty(context, clusterId);
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index e0c5eebbec..cae48273ff 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -204,8 +204,6 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg)
}
}
-// FIXME aconway 2007-02-05: Drop exchange member, calculate from
-// message in ::complete().
void Channel::handlePublish(Message* _message){
Message::shared_ptr message(_message);
messageBuilder.initialise(message);
@@ -292,12 +290,13 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- u_int64_t myDeliveryTag = currentDeliveryTag++;
+ u_int64_t myDeliveryTag = getNextSendRequestId();
msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+ destination,
queue->getMessageCount() + 1, myDeliveryTag,
framesize);
if(ackExpected){
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 52d8a0abeb..8ad5d691e6 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -125,7 +125,7 @@ class Channel : public framing::ChannelAdapter,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, bool ackExpected);
+ bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
void begin();
void close();
void commit();
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 76417056cb..e8a993942a 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -89,7 +89,8 @@ void BasicMessage::deliver(ChannelAdapter& channel,
sendContent(channel, framesize);
}
-void BasicMessage::sendGetOk(const MethodContext& context,
+void BasicMessage::sendGetOk(const MethodContext& context,
+ const std::string& /*destination*/,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize)
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 308fcc1791..9e77eab446 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -78,6 +78,7 @@ class BasicMessage : public Message {
u_int32_t framesize);
void sendGetOk(const framing::MethodContext&,
+ const std::string& destination,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize);
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index 32767191ca..41a0cd45fa 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -112,6 +112,7 @@ class Message{
* Used to return a message in response to a get from a queue
*/
virtual void sendGetOk(const framing::MethodContext& context,
+ const std::string& destination,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize) = 0;
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index 153acb9ab5..7a5943bc66 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -79,12 +79,38 @@ void MessageMessage::deliver(
}
void MessageMessage::sendGetOk(
- const framing::MethodContext& /*context*/,
+ const framing::MethodContext& context,
+ const std::string& destination,
u_int32_t /*messageCount*/,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
- // FIXME aconway 2007-02-05:
+ framing::ChannelAdapter* channel = context.channel;
+ channel->send(
+ new MessageTransferBody(channel->getVersion(),
+ transfer->getTicket(),
+ destination,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ transfer->getBody(),
+ transfer->getMandatory()));
}
bool MessageMessage::isComplete()
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index fb0a4749d4..89289a7fd0 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -60,6 +60,7 @@ class MessageMessage: public Message{
u_int32_t framesize);
void sendGetOk(const framing::MethodContext& context,
+ const std::string& destination,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize);
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 7200027115..797e3fbbf9 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -110,13 +110,13 @@ void
MessageHandlerImpl::get( const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
- const string& /*destination*/,
+ const string& destination,
bool noAck )
{
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channel->getId());
- if(channel.get(queue, !noAck))
+ if(channel.get(queue, destination, !noAck))
client.ok(context);
else
client.empty(context);