summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-05 19:16:09 +0000
committerAlan Conway <aconway@apache.org>2007-04-05 19:16:09 +0000
commitbb79efff2408de5f6cd66089cde8b8a82cc80cc2 (patch)
tree9d9c72158da31cebdd7ee538a11951b240922065 /cpp
parent2a1e4c9663ff0725c061248a96ebab763678fdd6 (diff)
downloadqpid-python-bb79efff2408de5f6cd66089cde8b8a82cc80cc2.tar.gz
* Exteneded use of shared pointers frame bodies across all send() commands.
* tests/Makefile.am: added check-unit target to run just unit tests. * Introduced make_shared_ptr convenience function for wrapping plain pointers with shared_ptr. * cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync): Pass shared_ptr instead of raw ptr to fix memory problems. Updated the following files to use make_shared_ptr - src/client/BasicMessageChannel.cpp - src/client/ClientConnection.cpp * src/client/MessageMessageChannel.cpp: implemented 0-9 message.get. * src/framing/Correlator.h,cpp: Allow request sender to register actions to take when the correlated response arrives. * cpp/src/tests/FramingTest.cpp: Added Correlator tests. * src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch response actions. * cpp/src/shared_ptr.h (make_shared_ptr): Convenience function to make a shared pointer from a raw pointer. * cpp/src/tests/ClientChannelTest.cpp: Added message.get test. * cpp/src/tests/Makefile.am (check-unit): Added test-unit target to run unit tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525932 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/client/BasicMessageChannel.cpp10
-rw-r--r--cpp/src/client/ClientChannel.cpp35
-rw-r--r--cpp/src/client/ClientChannel.h11
-rw-r--r--cpp/src/client/ClientConnection.cpp4
-rw-r--r--cpp/src/client/ClientMessage.h2
-rw-r--r--cpp/src/client/MessageMessageChannel.cpp103
-rw-r--r--cpp/src/framing/ChannelAdapter.cpp22
-rw-r--r--cpp/src/framing/ChannelAdapter.h28
-rw-r--r--cpp/src/framing/Correlator.cpp42
-rw-r--r--cpp/src/framing/Correlator.h68
-rw-r--r--cpp/src/framing/Requester.h17
-rw-r--r--cpp/src/shared_ptr.h10
-rw-r--r--cpp/src/tests/ClientChannelTest.cpp11
-rw-r--r--cpp/src/tests/FramingTest.cpp53
-rw-r--r--cpp/src/tests/Makefile.am3
16 files changed, 345 insertions, 77 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 00e31a7d1a..50f271697d 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -92,10 +92,11 @@ libqpidcommon_la_SOURCES = \
$(framing)/ProtocolVersionException.cpp \
$(framing)/Requester.cpp \
$(framing)/Responder.cpp \
+ $(framing)/Correlator.cpp \
$(framing)/Value.cpp \
$(framing)/Proxy.cpp \
$(gen)/AMQP_ClientProxy.cpp \
- $(gen)/AMQP_HighestVersion.h \
+ $(gen)/AMQP_HighestVersion.h \
$(gen)/AMQP_MethodVersionMap.cpp \
$(gen)/AMQP_ServerProxy.cpp \
Exception.cpp \
diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp
index 9e3d184673..c577c0a305 100644
--- a/cpp/src/client/BasicMessageChannel.cpp
+++ b/cpp/src/client/BasicMessageChannel.cpp
@@ -81,10 +81,10 @@ void BasicMessageChannel::consume(
BasicConsumeOkBody::shared_ptr ok =
channel.sendAndReceiveSync<BasicConsumeOkBody>(
synch,
- new BasicConsumeBody(
+ make_shared_ptr(new BasicConsumeBody(
channel.version, 0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
+ fields ? *fields : FieldTable())));
tag = ok->getConsumerTag();
}
@@ -102,7 +102,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(channel.version, tag, !synch));
+ synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
@@ -337,9 +337,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand
void BasicMessageChannel::setQos(){
channel.sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+ channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
diff --git a/cpp/src/client/ClientChannel.cpp b/cpp/src/client/ClientChannel.cpp
index 99eece46bc..533b590010 100644
--- a/cpp/src/client/ClientChannel.cpp
+++ b/cpp/src/client/ClientChannel.cpp
@@ -60,7 +60,7 @@ void Channel::open(ChannelId id, Connection& con)
init(id, con, con.getVersion()); // ChannelAdapter initialization.
string oob;
if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+ sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
}
void Channel::protocolInit(
@@ -77,10 +77,10 @@ void Channel::protocolInit(
string locale("en_US");
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
- new ConnectionStartOkBody(
+ make_shared_ptr(new ConnectionStartOkBody(
version, connectionStart->getRequestId(),
props, mechanism,
- response, locale));
+ response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -136,15 +136,15 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
FieldTable args;
sendAndReceiveSync<ExchangeDeclareOkBody>(
synch,
- new ExchangeDeclareBody(
- version, 0, name, type, false, false, false, false, !synch, args));
+ make_shared_ptr(new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args)));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
sendAndReceiveSync<ExchangeDeleteOkBody>(
synch,
- new ExchangeDeleteBody(version, 0, name, false, !synch));
+ make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -153,9 +153,9 @@ void Channel::declareQueue(Queue& queue, bool synch){
QueueDeclareOkBody::shared_ptr response =
sendAndReceiveSync<QueueDeclareOkBody>(
synch,
- new QueueDeclareBody(
+ make_shared_ptr(new QueueDeclareBody(
version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
if(synch) {
if(queue.getName().length() == 0)
queue.setName(response->getQueue());
@@ -167,7 +167,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
string name = queue.getName();
sendAndReceiveSync<QueueDeleteOkBody>(
synch,
- new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
@@ -175,15 +175,15 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
string q = queue.getName();
sendAndReceiveSync<QueueBindOkBody>(
synch,
- new QueueBindBody(version, 0, q, e, key,!synch, args));
+ make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
}
void Channel::commit(){
- sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
+ sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
+ sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::handleMethodInContext(
@@ -203,7 +203,8 @@ void Channel::handleMethodInContext(
}
try {
switch (method->amqpClassId()) {
- case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
+ case MessageOkBody::CLASS_ID:
+ case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
@@ -261,8 +262,8 @@ void Channel::close(
try {
if (getId() != 0) {
sendAndReceive<ChannelCloseOkBody>(
- new ChannelCloseBody(
- version, code, text, classId, methodId));
+ make_shared_ptr(new ChannelCloseBody(
+ version, code, text, classId, methodId)));
}
static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
@@ -292,7 +293,7 @@ void Channel::closeInternal() {
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody* toSend, ClassId c, MethodId m)
+ AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
@@ -300,7 +301,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive(
}
AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
- bool sync, AMQMethodBody* body, ClassId c, MethodId m)
+ bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
{
if(sync)
return sendAndReceive(body, c, m);
diff --git a/cpp/src/client/ClientChannel.h b/cpp/src/client/ClientChannel.h
index cf2ea1dbe5..328fc23f68 100644
--- a/cpp/src/client/ClientChannel.h
+++ b/cpp/src/client/ClientChannel.h
@@ -56,6 +56,7 @@ class Channel : public framing::ChannelAdapter
{
private:
struct UnknownMethod {};
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
sys::Mutex lock;
boost::scoped_ptr<MessageChannel> messaging;
@@ -82,21 +83,23 @@ class Channel : public framing::ChannelAdapter
const std::string& vhost);
framing::AMQMethodBody::shared_ptr sendAndReceive(
- framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
- framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceiveSync(
- bool sync, framing::AMQMethodBody* body) {
+ bool sync, framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
sendAndReceiveSync(
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
diff --git a/cpp/src/client/ClientConnection.cpp b/cpp/src/client/ClientConnection.cpp
index 365311ab37..b053a45b0f 100644
--- a/cpp/src/client/ClientConnection.cpp
+++ b/cpp/src/client/ClientConnection.cpp
@@ -87,8 +87,8 @@ void Connection::close(
// partly closed with threads left unjoined.
isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
- new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId));
+ make_shared_ptr(new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId)));
using boost::bind;
for_each(channels.begin(), channels.end(),
diff --git a/cpp/src/client/ClientMessage.h b/cpp/src/client/ClientMessage.h
index dc25b4070b..35aed6c734 100644
--- a/cpp/src/client/ClientMessage.h
+++ b/cpp/src/client/ClientMessage.h
@@ -33,6 +33,8 @@ namespace client {
*
* \ingroup clientapi
*/
+// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
+// basic header properties.
class Message : public framing::BasicHeaderProperties {
public:
Message(const std::string& data_=std::string()) : data(data_) {}
diff --git a/cpp/src/client/MessageMessageChannel.cpp b/cpp/src/client/MessageMessageChannel.cpp
index 25fbb95413..164a1cb426 100644
--- a/cpp/src/client/MessageMessageChannel.cpp
+++ b/cpp/src/client/MessageMessageChannel.cpp
@@ -25,6 +25,7 @@
#include "../framing/FieldTable.h"
#include "Connection.h"
#include "../shared_ptr.h"
+#include <boost/bind.hpp>
namespace qpid {
namespace client {
@@ -48,9 +49,9 @@ void MessageMessageChannel::consume(
if (tag.empty())
tag = newTag();
channel.sendAndReceive<MessageOkBody>(
- new MessageConsumeBody(
+ make_shared_ptr(new MessageConsumeBody(
channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+ ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
// // FIXME aconway 2007-02-20: Race condition!
// // We could receive the first message for the consumer
@@ -115,16 +116,44 @@ void MessageMessageChannel::close(){
*/
const string getDestinationId("__get__");
+/**
+ * A destination that provides a Correlator::Action to handle
+ * MessageEmpty responses.
+ */
+struct MessageGetDestination : public IncomingMessage::WaitableDestination
+{
+ void response(shared_ptr<AMQResponseBody> response) {
+ if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
+ switch (response->amqpMethodId()) {
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do, wait for transfer.
+ return;
+ case MessageEmptyBody::METHOD_ID:
+ empty(); // Wake up waiter with empty queue.
+ return;
+ }
+ }
+ throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
+ }
+
+ Correlator::Action action() {
+ return boost::bind(&MessageGetDestination::response, this, _1);
+ }
+};
+
bool MessageMessageChannel::get(
- Message& , const Queue& , AckMode )
+ Message& msg, const Queue& queue, AckMode ackMode)
{
Mutex::ScopedLock l(lock);
-// incoming.addDestination(getDestinationId, getDest);
-// channel.send(
-// new MessageGetBody(
-// channel.version, 0, queue.getName(), getDestinationId, ackMode));
-// return getDest.wait(msg);
- return false;
+ std::string destName=newTag();
+ MessageGetDestination dest;
+ incoming.addDestination(destName, dest);
+ channel.send(
+ make_shared_ptr(
+ new MessageGetBody(
+ channel.version, 0, queue.getName(), destName, ackMode)),
+ dest.action());
+ return dest.wait(msg);
}
@@ -176,9 +205,30 @@ void MessageMessageChannel::publish(
// FIXME aconway 2007-02-23:
throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
}
- channel.sendAndReceive<MessageOkBody>(transfer.get());
+ channel.sendAndReceive<MessageOkBody>(transfer);
}
+void copy(Message& msg, MessageTransferBody& transfer) {
+ // FIXME aconway 2007-04-05: Verify all required fields
+ // are copied.
+ msg.setContentType(transfer.getContentType());
+ msg.setContentEncoding(transfer.getContentEncoding());
+ msg.setHeaders(transfer.getApplicationHeaders());
+ msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
+ msg.setPriority(transfer.getPriority());
+ msg.setCorrelationId(transfer.getCorrelationId());
+ msg.setReplyTo(transfer.getReplyTo());
+ // FIXME aconway 2007-04-05: TTL/Expiration
+ msg.setMessageId(transfer.getMessageId());
+ msg.setTimestamp(transfer.getTimestamp());
+ msg.setUserId(transfer.getUserId());
+ msg.setAppId(transfer.getAppId());
+ msg.setDestination(transfer.getDestination());
+ msg.setRedelivered(transfer.getRedelivered());
+ msg.setDeliveryTag(0); // No meaning in 0-9
+ if (transfer.getBody().isInline())
+ msg.setData(transfer.getBody().getValue());
+}
void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
@@ -203,23 +253,38 @@ void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
break;
}
- case MessageEmptyBody::METHOD_ID: {
- // FIXME aconway 2007-04-04:
- // getDest.empty();
+ case MessageTransferBody::METHOD_ID: {
+ MessageTransferBody::shared_ptr transfer=
+ shared_polymorphic_downcast<MessageTransferBody>(method);
+ if (transfer->getBody().isInline()) {
+ Message msg;
+ copy(msg, *transfer);
+ // Deliver it.
+ incoming.getDestination(transfer->getDestination()).message(msg);
+ }
+ else {
+ Message& msg=incoming.createMessage(
+ transfer->getDestination(), transfer->getBody().getValue());
+ copy(msg, *transfer);
+ // Will be delivered when reference closes.
+ }
break;
}
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
+ case MessageEmptyBody::METHOD_ID:
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do
+ break;
// FIXME aconway 2007-04-03: TODO
- case MessageOkBody::METHOD_ID:
+ case MessageCancelBody::METHOD_ID:
+ case MessageCheckpointBody::METHOD_ID:
case MessageOffsetBody::METHOD_ID:
case MessageQosBody::METHOD_ID:
case MessageRecoverBody::METHOD_ID:
case MessageRejectBody::METHOD_ID:
case MessageResumeBody::METHOD_ID:
- case MessageTransferBody::METHOD_ID:
+ break;
default:
throw Channel::UnknownMethod();
}
@@ -322,10 +387,10 @@ void MessageMessageChannel::setReturnedMessageHandler(
void MessageMessageChannel::setQos(){
channel.sendAndReceive<MessageOkBody>(
- new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
channel.sendAndReceive<TxSelectOkBody>(
- new TxSelectBody(channel.version));
+ make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
diff --git a/cpp/src/framing/ChannelAdapter.cpp b/cpp/src/framing/ChannelAdapter.cpp
index 99a14f08fb..d16934a857 100644
--- a/cpp/src/framing/ChannelAdapter.cpp
+++ b/cpp/src/framing/ChannelAdapter.cpp
@@ -35,15 +35,19 @@ void ChannelAdapter::init(
version = v;
}
-RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
- RequestId result = 0;
+RequestId ChannelAdapter::send(
+ shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ RequestId requestId = 0;
assertChannelOpen();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
boost::shared_polymorphic_downcast<AMQRequestBody>(body);
requester.sending(request->getData());
- result = request->getData().requestId;
+ requestId = request->getData().requestId;
+ if (!action.empty())
+ correlator.request(requestId, action);
break;
}
case RESPONSE_BODY: {
@@ -52,9 +56,10 @@ RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
responder.sending(response->getData());
break;
}
+ // No action required for other body types.
}
out->send(new AMQFrame(getVersion(), getId(), body));
- return result;
+ return requestId;
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
@@ -66,10 +71,15 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
assertMethodOk(*response);
- // TODO aconway 2007-01-30: Consider a response handled on receipt.
- // Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
+
+ // FIXME aconway 2007-04-05: processed should be last
+ // but causes problems with InProcessBroker tests because
+ // we execute client code in handleMethod.
+ // Need to introduce a queue & 2 threads for inprocess.
requester.processed(responseData);
+ // FIXME aconway 2007-04-04: exception handling.
+ correlator.response(response);
handleMethod(response);
}
diff --git a/cpp/src/framing/ChannelAdapter.h b/cpp/src/framing/ChannelAdapter.h
index 493191d92b..1b325495ff 100644
--- a/cpp/src/framing/ChannelAdapter.h
+++ b/cpp/src/framing/ChannelAdapter.h
@@ -22,11 +22,11 @@
*
*/
-#include <boost/shared_ptr.hpp>
-
+#include "../shared_ptr.h"
#include "BodyHandler.h"
#include "Requester.h"
#include "Responder.h"
+#include "Correlator.h"
#include "amqp_types.h"
namespace qpid {
@@ -64,17 +64,24 @@ class ChannelAdapter : public BodyHandler {
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
-
+
/**
- * Wrap body in a frame and send the frame.
- * Takes ownership of body.
+ * Send a frame.
+ *@param body Body of the frame.
+ *@param action optional action to execute when we receive a
+ *response to this frame. Ignored if body is not a Request.
+ *@return If body is a request, the ID assigned else 0.
*/
- RequestId send(AMQBody::shared_ptr body);
+ RequestId send(shared_ptr<AMQBody> body,
+ Correlator::Action action=Correlator::Action());
+
+ // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites.
+ /**@deprecated Use make_shared_ptr with the other send() override */
RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
- void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
- void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+ void handleMethod(shared_ptr<AMQMethodBody>);
+ void handleRequest(shared_ptr<AMQRequestBody>);
+ void handleResponse(shared_ptr<AMQResponseBody>);
virtual bool isOpen() const = 0;
@@ -84,7 +91,7 @@ class ChannelAdapter : public BodyHandler {
void assertChannelNotOpen() const;
virtual void handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ shared_ptr<AMQMethodBody> method,
const MethodContext& context) = 0;
RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
@@ -97,6 +104,7 @@ class ChannelAdapter : public BodyHandler {
ProtocolVersion version;
Requester requester;
Responder responder;
+ Correlator correlator;
};
}}
diff --git a/cpp/src/framing/Correlator.cpp b/cpp/src/framing/Correlator.cpp
new file mode 100644
index 0000000000..1c18f6b414
--- /dev/null
+++ b/cpp/src/framing/Correlator.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+namespace qpid {
+namespace framing {
+
+void Correlator::request(RequestId id, Action action) {
+ actions[id] = action;
+}
+
+bool Correlator::response(shared_ptr<AMQResponseBody> r) {
+ Actions::iterator begin = actions.lower_bound(r->getRequestId());
+ Actions::iterator end =
+ actions.upper_bound(r->getRequestId()+r->getBatchOffset());
+ bool didAction = false;
+ for(Actions::iterator i=begin; i != end; ++i) {
+ // FIXME aconway 2007-04-04: Exception handling.
+ didAction = true;
+ i->second(r);
+ actions.erase(i);
+ }
+ return didAction;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/framing/Correlator.h b/cpp/src/framing/Correlator.h
new file mode 100644
index 0000000000..b3eb998149
--- /dev/null
+++ b/cpp/src/framing/Correlator.h
@@ -0,0 +1,68 @@
+#ifndef _framing_Correlator_h
+#define _framing_Correlator_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "../shared_ptr.h"
+#include "../framing/AMQResponseBody.h"
+#include <boost/function.hpp>
+#include <map>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Correlate responses with actions established when sending the request.
+ *
+ * THREAD UNSAFE.
+ */
+class Correlator
+{
+ public:
+ typedef shared_ptr<AMQResponseBody> ResponsePtr;
+ typedef boost::function<void (ResponsePtr)> Action;
+
+ /**
+ * Note that request with id was sent, record an action to call
+ * when a response arrives.
+ */
+ void request(RequestId id, Action doOnResponse);
+
+ /**
+ * Note response received, call action for associated request if any.
+ * Return true of some action(s) were executed.
+ */
+ bool response(shared_ptr<AMQResponseBody>);
+
+ /**
+ * Note the given execution mark was received, call actions
+ * for any requests that are impicitly responded to.
+ */
+ void mark(RequestId mark);
+
+ private:
+ typedef std::map<RequestId, Action> Actions;
+ Actions actions;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Correlator_h*/
diff --git a/cpp/src/framing/Requester.h b/cpp/src/framing/Requester.h
index dcc4460041..65bdc9a5a1 100644
--- a/cpp/src/framing/Requester.h
+++ b/cpp/src/framing/Requester.h
@@ -32,8 +32,7 @@ class AMQResponseBody;
/**
* Manage request IDs and the response mark for locally initiated requests.
*
- * THREAD UNSAFE: This class is called as frames are sent or received
- * sequentially on a connection, so it does not need to be thread safe.
+ * THREAD UNSAFE: must be locked externally.
*/
class Requester
{
@@ -46,12 +45,14 @@ class Requester
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
- /** Get the next request id to be used. */
- RequestId getNextId() { return lastId + 1; }
- /** Get the first request acked by this response */
- RequestId getFirstAckRequest() { return firstAckRequest; }
- /** Get the last request acked by this response */
- RequestId getLastAckRequest() { return lastAckRequest; }
+ /** Get the next request id to be used. */
+ RequestId getNextId() { return lastId + 1; }
+
+ /** Get the first request acked by last response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+
+ /** Get the last request acked by last response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
private:
RequestId lastId;
diff --git a/cpp/src/shared_ptr.h b/cpp/src/shared_ptr.h
index c4d547e5bb..df08c325df 100644
--- a/cpp/src/shared_ptr.h
+++ b/cpp/src/shared_ptr.h
@@ -23,12 +23,20 @@
#include <boost/cast.hpp>
namespace qpid {
-/// Import shared_ptr definitions into qpid namespace.
+
+// Import shared_ptr definitions into qpid namespace and define some
+// useful shared_ptr templates for convenience.
+
using boost::shared_ptr;
using boost::dynamic_pointer_cast;
using boost::static_pointer_cast;
using boost::const_pointer_cast;
using boost::shared_polymorphic_downcast;
+
+template <class T> shared_ptr<T> make_shared_ptr(T* ptr) {
+ return shared_ptr<T>(ptr);
+}
+
} // namespace qpid
diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp
index d5d1005aa9..8dc3e4b432 100644
--- a/cpp/src/tests/ClientChannelTest.cpp
+++ b/cpp/src/tests/ClientChannelTest.cpp
@@ -27,6 +27,7 @@
#include "../client/ClientExchange.h"
#include "../client/MessageListener.h"
#include "../client/BasicMessageChannel.h"
+#include "../client/MessageMessageChannel.h"
using namespace std;
using namespace boost;
@@ -203,7 +204,17 @@ class BasicClientChannelTest : public ClientChannelTestBase {
}
};
+class MessageClientChannelTest : public ClientChannelTestBase {
+ CPPUNIT_TEST_SUITE(MessageClientChannelTest);
+ CPPUNIT_TEST(testPublishGet);
+ CPPUNIT_TEST_SUITE_END();
+ public:
+ MessageClientChannelTest() {
+ channel.reset(new Channel(false, 500, Channel::AMQP_09));
+ }
+};
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageClientChannelTest);
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 954c378c37..aa7cd90bc2 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -18,9 +18,6 @@
* under the License.
*
*/
-#include <memory>
-#include <boost/lexical_cast.hpp>
-
#include "ConnectionRedirectBody.h"
#include "../framing/ProtocolVersion.h"
#include "../framing/amqp_framing.h"
@@ -38,6 +35,11 @@
#include "../client/Connection.h"
#include "../client/ClientExchange.h"
#include "../client/ClientQueue.h"
+#include "../framing/Correlator.h"
+#include "BasicGetOkBody.h"
+#include <memory>
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
using namespace qpid;
using namespace qpid::framing;
@@ -65,6 +67,7 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST(testResponseBodyFrame);
CPPUNIT_TEST(testRequester);
CPPUNIT_TEST(testResponder);
+ CPPUNIT_TEST(testCorrelator);
CPPUNIT_TEST(testInlineContent);
CPPUNIT_TEST(testContentReference);
CPPUNIT_TEST(testContentValidation);
@@ -300,7 +303,7 @@ class FramingTest : public CppUnit::TestCase
Responder r;
AMQRequestBody::Data q;
AMQResponseBody::Data p;
-
+
q.requestId = 1;
q.responseMark = 0;
r.received(q);
@@ -335,6 +338,48 @@ class FramingTest : public CppUnit::TestCase
}
+
+ std::vector<Correlator::ResponsePtr> correlations;
+
+ void correlatorCallback(Correlator::ResponsePtr r) {
+ correlations.push_back(r);
+ }
+
+ struct DummyResponse : public AMQResponseBody {
+ DummyResponse(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : AMQResponseBody(version, id, req, off) {}
+ uint32_t size() const { return 0; }
+ void print(std::ostream&) const {}
+ MethodId amqpMethodId() const { return 0; }
+ ClassId amqpClassId() const { return 0; }
+ void encodeContent(Buffer& ) const {}
+ void decodeContent(Buffer& ) {}
+ };
+
+ void testCorrelator() {
+ CPPUNIT_ASSERT(correlations.empty());
+ Correlator c;
+ Correlator::Action action = boost::bind(&FramingTest::correlatorCallback, this, _1);
+ c.request(5, action);
+ Correlator::ResponsePtr r1(new DummyResponse(3, 5, 0));
+ CPPUNIT_ASSERT(c.response(r1));
+ CPPUNIT_ASSERT_EQUAL(size_t(1), correlations.size());
+ CPPUNIT_ASSERT(correlations.front() == r1);
+ correlations.clear();
+
+ c.request(6, action);
+ c.request(7, action);
+ c.request(8, action);
+ Correlator::ResponsePtr r2(new DummyResponse(4, 6, 3));
+ CPPUNIT_ASSERT(c.response(r2));
+ CPPUNIT_ASSERT_EQUAL(size_t(3), correlations.size());
+ CPPUNIT_ASSERT(r2 == correlations[0]);
+ CPPUNIT_ASSERT(r2 == correlations[1]);
+ CPPUNIT_ASSERT(r2 == correlations[2]);
+ Correlator::ResponsePtr r3(new DummyResponse(5, 99, 0));
+ CPPUNIT_ASSERT(!c.response(r3));
+ }
+
// expect may contain null chars so use string(ptr,size) constructor
// Use sizeof(expect)-1 to strip the trailing null.
#define ASSERT_FRAME(expect, frame) \
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 0dc6c3343e..a5d1fdbab5 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -110,6 +110,9 @@ gen.mk: Makefile.am
check: .valgrindrc $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker)
+check-unit:
+ $(MAKE) check TESTS=run-unit-tests
+
# Create a copy so user can modify without risk of checking in their mods.
.valgrindrc: .valgrindrc-default
cp .valgrindrc-default .valgrindrc