summaryrefslogtreecommitdiff
path: root/cpp/src/client/MessageMessageChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-05 19:16:09 +0000
committerAlan Conway <aconway@apache.org>2007-04-05 19:16:09 +0000
commitbb79efff2408de5f6cd66089cde8b8a82cc80cc2 (patch)
tree9d9c72158da31cebdd7ee538a11951b240922065 /cpp/src/client/MessageMessageChannel.cpp
parent2a1e4c9663ff0725c061248a96ebab763678fdd6 (diff)
downloadqpid-python-bb79efff2408de5f6cd66089cde8b8a82cc80cc2.tar.gz
* Exteneded use of shared pointers frame bodies across all send() commands.
* tests/Makefile.am: added check-unit target to run just unit tests. * Introduced make_shared_ptr convenience function for wrapping plain pointers with shared_ptr. * cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync): Pass shared_ptr instead of raw ptr to fix memory problems. Updated the following files to use make_shared_ptr - src/client/BasicMessageChannel.cpp - src/client/ClientConnection.cpp * src/client/MessageMessageChannel.cpp: implemented 0-9 message.get. * src/framing/Correlator.h,cpp: Allow request sender to register actions to take when the correlated response arrives. * cpp/src/tests/FramingTest.cpp: Added Correlator tests. * src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch response actions. * cpp/src/shared_ptr.h (make_shared_ptr): Convenience function to make a shared pointer from a raw pointer. * cpp/src/tests/ClientChannelTest.cpp: Added message.get test. * cpp/src/tests/Makefile.am (check-unit): Added test-unit target to run unit tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525932 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/client/MessageMessageChannel.cpp')
-rw-r--r--cpp/src/client/MessageMessageChannel.cpp103
1 files changed, 84 insertions, 19 deletions
diff --git a/cpp/src/client/MessageMessageChannel.cpp b/cpp/src/client/MessageMessageChannel.cpp
index 25fbb95413..164a1cb426 100644
--- a/cpp/src/client/MessageMessageChannel.cpp
+++ b/cpp/src/client/MessageMessageChannel.cpp
@@ -25,6 +25,7 @@
#include "../framing/FieldTable.h"
#include "Connection.h"
#include "../shared_ptr.h"
+#include <boost/bind.hpp>
namespace qpid {
namespace client {
@@ -48,9 +49,9 @@ void MessageMessageChannel::consume(
if (tag.empty())
tag = newTag();
channel.sendAndReceive<MessageOkBody>(
- new MessageConsumeBody(
+ make_shared_ptr(new MessageConsumeBody(
channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+ ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
// // FIXME aconway 2007-02-20: Race condition!
// // We could receive the first message for the consumer
@@ -115,16 +116,44 @@ void MessageMessageChannel::close(){
*/
const string getDestinationId("__get__");
+/**
+ * A destination that provides a Correlator::Action to handle
+ * MessageEmpty responses.
+ */
+struct MessageGetDestination : public IncomingMessage::WaitableDestination
+{
+ void response(shared_ptr<AMQResponseBody> response) {
+ if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
+ switch (response->amqpMethodId()) {
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do, wait for transfer.
+ return;
+ case MessageEmptyBody::METHOD_ID:
+ empty(); // Wake up waiter with empty queue.
+ return;
+ }
+ }
+ throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
+ }
+
+ Correlator::Action action() {
+ return boost::bind(&MessageGetDestination::response, this, _1);
+ }
+};
+
bool MessageMessageChannel::get(
- Message& , const Queue& , AckMode )
+ Message& msg, const Queue& queue, AckMode ackMode)
{
Mutex::ScopedLock l(lock);
-// incoming.addDestination(getDestinationId, getDest);
-// channel.send(
-// new MessageGetBody(
-// channel.version, 0, queue.getName(), getDestinationId, ackMode));
-// return getDest.wait(msg);
- return false;
+ std::string destName=newTag();
+ MessageGetDestination dest;
+ incoming.addDestination(destName, dest);
+ channel.send(
+ make_shared_ptr(
+ new MessageGetBody(
+ channel.version, 0, queue.getName(), destName, ackMode)),
+ dest.action());
+ return dest.wait(msg);
}
@@ -176,9 +205,30 @@ void MessageMessageChannel::publish(
// FIXME aconway 2007-02-23:
throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
}
- channel.sendAndReceive<MessageOkBody>(transfer.get());
+ channel.sendAndReceive<MessageOkBody>(transfer);
}
+void copy(Message& msg, MessageTransferBody& transfer) {
+ // FIXME aconway 2007-04-05: Verify all required fields
+ // are copied.
+ msg.setContentType(transfer.getContentType());
+ msg.setContentEncoding(transfer.getContentEncoding());
+ msg.setHeaders(transfer.getApplicationHeaders());
+ msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
+ msg.setPriority(transfer.getPriority());
+ msg.setCorrelationId(transfer.getCorrelationId());
+ msg.setReplyTo(transfer.getReplyTo());
+ // FIXME aconway 2007-04-05: TTL/Expiration
+ msg.setMessageId(transfer.getMessageId());
+ msg.setTimestamp(transfer.getTimestamp());
+ msg.setUserId(transfer.getUserId());
+ msg.setAppId(transfer.getAppId());
+ msg.setDestination(transfer.getDestination());
+ msg.setRedelivered(transfer.getRedelivered());
+ msg.setDeliveryTag(0); // No meaning in 0-9
+ if (transfer.getBody().isInline())
+ msg.setData(transfer.getBody().getValue());
+}
void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
@@ -203,23 +253,38 @@ void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
break;
}
- case MessageEmptyBody::METHOD_ID: {
- // FIXME aconway 2007-04-04:
- // getDest.empty();
+ case MessageTransferBody::METHOD_ID: {
+ MessageTransferBody::shared_ptr transfer=
+ shared_polymorphic_downcast<MessageTransferBody>(method);
+ if (transfer->getBody().isInline()) {
+ Message msg;
+ copy(msg, *transfer);
+ // Deliver it.
+ incoming.getDestination(transfer->getDestination()).message(msg);
+ }
+ else {
+ Message& msg=incoming.createMessage(
+ transfer->getDestination(), transfer->getBody().getValue());
+ copy(msg, *transfer);
+ // Will be delivered when reference closes.
+ }
break;
}
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
+ case MessageEmptyBody::METHOD_ID:
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do
+ break;
// FIXME aconway 2007-04-03: TODO
- case MessageOkBody::METHOD_ID:
+ case MessageCancelBody::METHOD_ID:
+ case MessageCheckpointBody::METHOD_ID:
case MessageOffsetBody::METHOD_ID:
case MessageQosBody::METHOD_ID:
case MessageRecoverBody::METHOD_ID:
case MessageRejectBody::METHOD_ID:
case MessageResumeBody::METHOD_ID:
- case MessageTransferBody::METHOD_ID:
+ break;
default:
throw Channel::UnknownMethod();
}
@@ -322,10 +387,10 @@ void MessageMessageChannel::setReturnedMessageHandler(
void MessageMessageChannel::setQos(){
channel.sendAndReceive<MessageOkBody>(
- new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
channel.sendAndReceive<TxSelectOkBody>(
- new TxSelectBody(channel.version));
+ make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client