diff options
author | Alan Conway <aconway@apache.org> | 2007-04-05 19:16:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-04-05 19:16:09 +0000 |
commit | bb79efff2408de5f6cd66089cde8b8a82cc80cc2 (patch) | |
tree | 9d9c72158da31cebdd7ee538a11951b240922065 /cpp/src | |
parent | 2a1e4c9663ff0725c061248a96ebab763678fdd6 (diff) | |
download | qpid-python-bb79efff2408de5f6cd66089cde8b8a82cc80cc2.tar.gz |
* Exteneded use of shared pointers frame bodies across all send() commands.
* tests/Makefile.am: added check-unit target to run just unit tests.
* Introduced make_shared_ptr convenience function for wrapping
plain pointers with shared_ptr.
* cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync):
Pass shared_ptr instead of raw ptr to fix memory problems.
Updated the following files to use make_shared_ptr
- src/client/BasicMessageChannel.cpp
- src/client/ClientConnection.cpp
* src/client/MessageMessageChannel.cpp: implemented 0-9 message.get.
* src/framing/Correlator.h,cpp: Allow request sender to register actions
to take when the correlated response arrives.
* cpp/src/tests/FramingTest.cpp: Added Correlator tests.
* src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch
response actions.
* cpp/src/shared_ptr.h (make_shared_ptr): Convenience function
to make a shared pointer from a raw pointer.
* cpp/src/tests/ClientChannelTest.cpp: Added message.get test.
* cpp/src/tests/Makefile.am (check-unit): Added test-unit target
to run unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525932 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/client/BasicMessageChannel.cpp | 10 | ||||
-rw-r--r-- | cpp/src/client/ClientChannel.cpp | 35 | ||||
-rw-r--r-- | cpp/src/client/ClientChannel.h | 11 | ||||
-rw-r--r-- | cpp/src/client/ClientConnection.cpp | 4 | ||||
-rw-r--r-- | cpp/src/client/ClientMessage.h | 2 | ||||
-rw-r--r-- | cpp/src/client/MessageMessageChannel.cpp | 103 | ||||
-rw-r--r-- | cpp/src/framing/ChannelAdapter.cpp | 22 | ||||
-rw-r--r-- | cpp/src/framing/ChannelAdapter.h | 28 | ||||
-rw-r--r-- | cpp/src/framing/Correlator.cpp | 42 | ||||
-rw-r--r-- | cpp/src/framing/Correlator.h | 68 | ||||
-rw-r--r-- | cpp/src/framing/Requester.h | 17 | ||||
-rw-r--r-- | cpp/src/shared_ptr.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/ClientChannelTest.cpp | 11 | ||||
-rw-r--r-- | cpp/src/tests/FramingTest.cpp | 53 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 3 |
16 files changed, 345 insertions, 77 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 00e31a7d1a..50f271697d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -92,10 +92,11 @@ libqpidcommon_la_SOURCES = \ $(framing)/ProtocolVersionException.cpp \ $(framing)/Requester.cpp \ $(framing)/Responder.cpp \ + $(framing)/Correlator.cpp \ $(framing)/Value.cpp \ $(framing)/Proxy.cpp \ $(gen)/AMQP_ClientProxy.cpp \ - $(gen)/AMQP_HighestVersion.h \ + $(gen)/AMQP_HighestVersion.h \ $(gen)/AMQP_MethodVersionMap.cpp \ $(gen)/AMQP_ServerProxy.cpp \ Exception.cpp \ diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp index 9e3d184673..c577c0a305 100644 --- a/cpp/src/client/BasicMessageChannel.cpp +++ b/cpp/src/client/BasicMessageChannel.cpp @@ -81,10 +81,10 @@ void BasicMessageChannel::consume( BasicConsumeOkBody::shared_ptr ok = channel.sendAndReceiveSync<BasicConsumeOkBody>( synch, - new BasicConsumeBody( + make_shared_ptr(new BasicConsumeBody( channel.version, 0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); + fields ? *fields : FieldTable()))); tag = ok->getConsumerTag(); } @@ -102,7 +102,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(channel.version, tag, !synch)); + synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); } void BasicMessageChannel::close(){ @@ -337,9 +337,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand void BasicMessageChannel::setQos(){ channel.sendAndReceive<BasicQosOkBody>( - new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); + make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); + channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client diff --git a/cpp/src/client/ClientChannel.cpp b/cpp/src/client/ClientChannel.cpp index 99eece46bc..533b590010 100644 --- a/cpp/src/client/ClientChannel.cpp +++ b/cpp/src/client/ClientChannel.cpp @@ -60,7 +60,7 @@ void Channel::open(ChannelId id, Connection& con) init(id, con, con.getVersion()); // ChannelAdapter initialization. string oob; if (id != 0) - sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); + sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob))); } void Channel::protocolInit( @@ -77,10 +77,10 @@ void Channel::protocolInit( string locale("en_US"); ConnectionTuneBody::shared_ptr proposal = sendAndReceive<ConnectionTuneBody>( - new ConnectionStartOkBody( + make_shared_ptr(new ConnectionStartOkBody( version, connectionStart->getRequestId(), props, mechanism, - response, locale)); + response, locale))); /** * Assume for now that further challenges will not be required @@ -136,15 +136,15 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ FieldTable args; sendAndReceiveSync<ExchangeDeclareOkBody>( synch, - new ExchangeDeclareBody( - version, 0, name, type, false, false, false, false, !synch, args)); + make_shared_ptr(new ExchangeDeclareBody( + version, 0, name, type, false, false, false, false, !synch, args))); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); sendAndReceiveSync<ExchangeDeleteOkBody>( synch, - new ExchangeDeleteBody(version, 0, name, false, !synch)); + make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch))); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -153,9 +153,9 @@ void Channel::declareQueue(Queue& queue, bool synch){ QueueDeclareOkBody::shared_ptr response = sendAndReceiveSync<QueueDeclareOkBody>( synch, - new QueueDeclareBody( + make_shared_ptr(new QueueDeclareBody( version, 0, name, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args)); + queue.isExclusive(), queue.isAutoDelete(), !synch, args))); if(synch) { if(queue.getName().length() == 0) queue.setName(response->getQueue()); @@ -167,7 +167,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) string name = queue.getName(); sendAndReceiveSync<QueueDeleteOkBody>( synch, - new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); + make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch))); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ @@ -175,15 +175,15 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri string q = queue.getName(); sendAndReceiveSync<QueueBindOkBody>( synch, - new QueueBindBody(version, 0, q, e, key,!synch, args)); + make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args))); } void Channel::commit(){ - sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); + sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version)); + sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version))); } void Channel::handleMethodInContext( @@ -203,7 +203,8 @@ void Channel::handleMethodInContext( } try { switch (method->amqpClassId()) { - case BasicDeliverBody::CLASS_ID: messaging->handle(method); break; + case MessageOkBody::CLASS_ID: + case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; default: throw UnknownMethod(); @@ -261,8 +262,8 @@ void Channel::close( try { if (getId() != 0) { sendAndReceive<ChannelCloseOkBody>( - new ChannelCloseBody( - version, code, text, classId, methodId)); + make_shared_ptr(new ChannelCloseBody( + version, code, text, classId, methodId))); } static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); @@ -292,7 +293,7 @@ void Channel::closeInternal() { } AMQMethodBody::shared_ptr Channel::sendAndReceive( - AMQMethodBody* toSend, ClassId c, MethodId m) + AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) { responses.expect(); send(toSend); @@ -300,7 +301,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive( } AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( - bool sync, AMQMethodBody* body, ClassId c, MethodId m) + bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m) { if(sync) return sendAndReceive(body, c, m); diff --git a/cpp/src/client/ClientChannel.h b/cpp/src/client/ClientChannel.h index cf2ea1dbe5..328fc23f68 100644 --- a/cpp/src/client/ClientChannel.h +++ b/cpp/src/client/ClientChannel.h @@ -56,6 +56,7 @@ class Channel : public framing::ChannelAdapter { private: struct UnknownMethod {}; + typedef shared_ptr<framing::AMQMethodBody> MethodPtr; sys::Mutex lock; boost::scoped_ptr<MessageChannel> messaging; @@ -82,21 +83,23 @@ class Channel : public framing::ChannelAdapter const std::string& vhost); framing::AMQMethodBody::shared_ptr sendAndReceive( - framing::AMQMethodBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody::shared_ptr, + framing::ClassId, framing::MethodId); framing::AMQMethodBody::shared_ptr sendAndReceiveSync( bool sync, - framing::AMQMethodBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody::shared_ptr, + framing::ClassId, framing::MethodId); template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { + boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) { return boost::shared_polymorphic_downcast<BodyType>( sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } template <class BodyType> boost::shared_ptr<BodyType> sendAndReceiveSync( - bool sync, framing::AMQMethodBody* body) { + bool sync, framing::AMQMethodBody::shared_ptr body) { return boost::shared_polymorphic_downcast<BodyType>( sendAndReceiveSync( sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); diff --git a/cpp/src/client/ClientConnection.cpp b/cpp/src/client/ClientConnection.cpp index 365311ab37..b053a45b0f 100644 --- a/cpp/src/client/ClientConnection.cpp +++ b/cpp/src/client/ClientConnection.cpp @@ -87,8 +87,8 @@ void Connection::close( // partly closed with threads left unjoined. isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( - new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId)); + make_shared_ptr(new ConnectionCloseBody( + getVersion(), code, msg, classId, methodId))); using boost::bind; for_each(channels.begin(), channels.end(), diff --git a/cpp/src/client/ClientMessage.h b/cpp/src/client/ClientMessage.h index dc25b4070b..35aed6c734 100644 --- a/cpp/src/client/ClientMessage.h +++ b/cpp/src/client/ClientMessage.h @@ -33,6 +33,8 @@ namespace client { * * \ingroup clientapi */ +// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not +// basic header properties. class Message : public framing::BasicHeaderProperties { public: Message(const std::string& data_=std::string()) : data(data_) {} diff --git a/cpp/src/client/MessageMessageChannel.cpp b/cpp/src/client/MessageMessageChannel.cpp index 25fbb95413..164a1cb426 100644 --- a/cpp/src/client/MessageMessageChannel.cpp +++ b/cpp/src/client/MessageMessageChannel.cpp @@ -25,6 +25,7 @@ #include "../framing/FieldTable.h" #include "Connection.h" #include "../shared_ptr.h" +#include <boost/bind.hpp> namespace qpid { namespace client { @@ -48,9 +49,9 @@ void MessageMessageChannel::consume( if (tag.empty()) tag = newTag(); channel.sendAndReceive<MessageOkBody>( - new MessageConsumeBody( + make_shared_ptr(new MessageConsumeBody( channel.getVersion(), 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, fields ? *fields : FieldTable())); + ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); // // FIXME aconway 2007-02-20: Race condition! // // We could receive the first message for the consumer @@ -115,16 +116,44 @@ void MessageMessageChannel::close(){ */ const string getDestinationId("__get__"); +/** + * A destination that provides a Correlator::Action to handle + * MessageEmpty responses. + */ +struct MessageGetDestination : public IncomingMessage::WaitableDestination +{ + void response(shared_ptr<AMQResponseBody> response) { + if (response->amqpClassId() == MessageOkBody::CLASS_ID) { + switch (response->amqpMethodId()) { + case MessageOkBody::METHOD_ID: + // Nothing to do, wait for transfer. + return; + case MessageEmptyBody::METHOD_ID: + empty(); // Wake up waiter with empty queue. + return; + } + } + throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); + } + + Correlator::Action action() { + return boost::bind(&MessageGetDestination::response, this, _1); + } +}; + bool MessageMessageChannel::get( - Message& , const Queue& , AckMode ) + Message& msg, const Queue& queue, AckMode ackMode) { Mutex::ScopedLock l(lock); -// incoming.addDestination(getDestinationId, getDest); -// channel.send( -// new MessageGetBody( -// channel.version, 0, queue.getName(), getDestinationId, ackMode)); -// return getDest.wait(msg); - return false; + std::string destName=newTag(); + MessageGetDestination dest; + incoming.addDestination(destName, dest); + channel.send( + make_shared_ptr( + new MessageGetBody( + channel.version, 0, queue.getName(), destName, ackMode)), + dest.action()); + return dest.wait(msg); } @@ -176,9 +205,30 @@ void MessageMessageChannel::publish( // FIXME aconway 2007-02-23: throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented"); } - channel.sendAndReceive<MessageOkBody>(transfer.get()); + channel.sendAndReceive<MessageOkBody>(transfer); } +void copy(Message& msg, MessageTransferBody& transfer) { + // FIXME aconway 2007-04-05: Verify all required fields + // are copied. + msg.setContentType(transfer.getContentType()); + msg.setContentEncoding(transfer.getContentEncoding()); + msg.setHeaders(transfer.getApplicationHeaders()); + msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); + msg.setPriority(transfer.getPriority()); + msg.setCorrelationId(transfer.getCorrelationId()); + msg.setReplyTo(transfer.getReplyTo()); + // FIXME aconway 2007-04-05: TTL/Expiration + msg.setMessageId(transfer.getMessageId()); + msg.setTimestamp(transfer.getTimestamp()); + msg.setUserId(transfer.getUserId()); + msg.setAppId(transfer.getAppId()); + msg.setDestination(transfer.getDestination()); + msg.setRedelivered(transfer.getRedelivered()); + msg.setDeliveryTag(0); // No meaning in 0-9 + if (transfer.getBody().isInline()) + msg.setData(transfer.getBody().getValue()); +} void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); @@ -203,23 +253,38 @@ void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { break; } - case MessageEmptyBody::METHOD_ID: { - // FIXME aconway 2007-04-04: - // getDest.empty(); + case MessageTransferBody::METHOD_ID: { + MessageTransferBody::shared_ptr transfer= + shared_polymorphic_downcast<MessageTransferBody>(method); + if (transfer->getBody().isInline()) { + Message msg; + copy(msg, *transfer); + // Deliver it. + incoming.getDestination(transfer->getDestination()).message(msg); + } + else { + Message& msg=incoming.createMessage( + transfer->getDestination(), transfer->getBody().getValue()); + copy(msg, *transfer); + // Will be delivered when reference closes. + } break; } - case MessageCancelBody::METHOD_ID: - case MessageCheckpointBody::METHOD_ID: + case MessageEmptyBody::METHOD_ID: + case MessageOkBody::METHOD_ID: + // Nothing to do + break; // FIXME aconway 2007-04-03: TODO - case MessageOkBody::METHOD_ID: + case MessageCancelBody::METHOD_ID: + case MessageCheckpointBody::METHOD_ID: case MessageOffsetBody::METHOD_ID: case MessageQosBody::METHOD_ID: case MessageRecoverBody::METHOD_ID: case MessageRejectBody::METHOD_ID: case MessageResumeBody::METHOD_ID: - case MessageTransferBody::METHOD_ID: + break; default: throw Channel::UnknownMethod(); } @@ -322,10 +387,10 @@ void MessageMessageChannel::setReturnedMessageHandler( void MessageMessageChannel::setQos(){ channel.sendAndReceive<MessageOkBody>( - new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)); + make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) channel.sendAndReceive<TxSelectOkBody>( - new TxSelectBody(channel.version)); + make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client diff --git a/cpp/src/framing/ChannelAdapter.cpp b/cpp/src/framing/ChannelAdapter.cpp index 99a14f08fb..d16934a857 100644 --- a/cpp/src/framing/ChannelAdapter.cpp +++ b/cpp/src/framing/ChannelAdapter.cpp @@ -35,15 +35,19 @@ void ChannelAdapter::init( version = v; } -RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { - RequestId result = 0; +RequestId ChannelAdapter::send( + shared_ptr<AMQBody> body, Correlator::Action action) +{ + RequestId requestId = 0; assertChannelOpen(); switch (body->type()) { case REQUEST_BODY: { AMQRequestBody::shared_ptr request = boost::shared_polymorphic_downcast<AMQRequestBody>(body); requester.sending(request->getData()); - result = request->getData().requestId; + requestId = request->getData().requestId; + if (!action.empty()) + correlator.request(requestId, action); break; } case RESPONSE_BODY: { @@ -52,9 +56,10 @@ RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { responder.sending(response->getData()); break; } + // No action required for other body types. } out->send(new AMQFrame(getVersion(), getId(), body)); - return result; + return requestId; } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { @@ -66,10 +71,15 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { assertMethodOk(*response); - // TODO aconway 2007-01-30: Consider a response handled on receipt. - // Review - any cases where this is not the case? AMQResponseBody::Data& responseData = response->getData(); + + // FIXME aconway 2007-04-05: processed should be last + // but causes problems with InProcessBroker tests because + // we execute client code in handleMethod. + // Need to introduce a queue & 2 threads for inprocess. requester.processed(responseData); + // FIXME aconway 2007-04-04: exception handling. + correlator.response(response); handleMethod(response); } diff --git a/cpp/src/framing/ChannelAdapter.h b/cpp/src/framing/ChannelAdapter.h index 493191d92b..1b325495ff 100644 --- a/cpp/src/framing/ChannelAdapter.h +++ b/cpp/src/framing/ChannelAdapter.h @@ -22,11 +22,11 @@ * */ -#include <boost/shared_ptr.hpp> - +#include "../shared_ptr.h" #include "BodyHandler.h" #include "Requester.h" #include "Responder.h" +#include "Correlator.h" #include "amqp_types.h" namespace qpid { @@ -64,17 +64,24 @@ class ChannelAdapter : public BodyHandler { ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } - + /** - * Wrap body in a frame and send the frame. - * Takes ownership of body. + * Send a frame. + *@param body Body of the frame. + *@param action optional action to execute when we receive a + *response to this frame. Ignored if body is not a Request. + *@return If body is a request, the ID assigned else 0. */ - RequestId send(AMQBody::shared_ptr body); + RequestId send(shared_ptr<AMQBody> body, + Correlator::Action action=Correlator::Action()); + + // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites. + /**@deprecated Use make_shared_ptr with the other send() override */ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); - void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); - void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>); + void handleMethod(shared_ptr<AMQMethodBody>); + void handleRequest(shared_ptr<AMQRequestBody>); + void handleResponse(shared_ptr<AMQResponseBody>); virtual bool isOpen() const = 0; @@ -84,7 +91,7 @@ class ChannelAdapter : public BodyHandler { void assertChannelNotOpen() const; virtual void handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, + shared_ptr<AMQMethodBody> method, const MethodContext& context) = 0; RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } @@ -97,6 +104,7 @@ class ChannelAdapter : public BodyHandler { ProtocolVersion version; Requester requester; Responder responder; + Correlator correlator; }; }} diff --git a/cpp/src/framing/Correlator.cpp b/cpp/src/framing/Correlator.cpp new file mode 100644 index 0000000000..1c18f6b414 --- /dev/null +++ b/cpp/src/framing/Correlator.cpp @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Correlator.h" + +namespace qpid { +namespace framing { + +void Correlator::request(RequestId id, Action action) { + actions[id] = action; +} + +bool Correlator::response(shared_ptr<AMQResponseBody> r) { + Actions::iterator begin = actions.lower_bound(r->getRequestId()); + Actions::iterator end = + actions.upper_bound(r->getRequestId()+r->getBatchOffset()); + bool didAction = false; + for(Actions::iterator i=begin; i != end; ++i) { + // FIXME aconway 2007-04-04: Exception handling. + didAction = true; + i->second(r); + actions.erase(i); + } + return didAction; +} + +}} // namespace qpid::framing diff --git a/cpp/src/framing/Correlator.h b/cpp/src/framing/Correlator.h new file mode 100644 index 0000000000..b3eb998149 --- /dev/null +++ b/cpp/src/framing/Correlator.h @@ -0,0 +1,68 @@ +#ifndef _framing_Correlator_h +#define _framing_Correlator_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../shared_ptr.h" +#include "../framing/AMQResponseBody.h" +#include <boost/function.hpp> +#include <map> + +namespace qpid { +namespace framing { + +/** + * Correlate responses with actions established when sending the request. + * + * THREAD UNSAFE. + */ +class Correlator +{ + public: + typedef shared_ptr<AMQResponseBody> ResponsePtr; + typedef boost::function<void (ResponsePtr)> Action; + + /** + * Note that request with id was sent, record an action to call + * when a response arrives. + */ + void request(RequestId id, Action doOnResponse); + + /** + * Note response received, call action for associated request if any. + * Return true of some action(s) were executed. + */ + bool response(shared_ptr<AMQResponseBody>); + + /** + * Note the given execution mark was received, call actions + * for any requests that are impicitly responded to. + */ + void mark(RequestId mark); + + private: + typedef std::map<RequestId, Action> Actions; + Actions actions; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Correlator_h*/ diff --git a/cpp/src/framing/Requester.h b/cpp/src/framing/Requester.h index dcc4460041..65bdc9a5a1 100644 --- a/cpp/src/framing/Requester.h +++ b/cpp/src/framing/Requester.h @@ -32,8 +32,7 @@ class AMQResponseBody; /** * Manage request IDs and the response mark for locally initiated requests. * - * THREAD UNSAFE: This class is called as frames are sent or received - * sequentially on a connection, so it does not need to be thread safe. + * THREAD UNSAFE: must be locked externally. */ class Requester { @@ -46,12 +45,14 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); - /** Get the next request id to be used. */ - RequestId getNextId() { return lastId + 1; } - /** Get the first request acked by this response */ - RequestId getFirstAckRequest() { return firstAckRequest; } - /** Get the last request acked by this response */ - RequestId getLastAckRequest() { return lastAckRequest; } + /** Get the next request id to be used. */ + RequestId getNextId() { return lastId + 1; } + + /** Get the first request acked by last response */ + RequestId getFirstAckRequest() { return firstAckRequest; } + + /** Get the last request acked by last response */ + RequestId getLastAckRequest() { return lastAckRequest; } private: RequestId lastId; diff --git a/cpp/src/shared_ptr.h b/cpp/src/shared_ptr.h index c4d547e5bb..df08c325df 100644 --- a/cpp/src/shared_ptr.h +++ b/cpp/src/shared_ptr.h @@ -23,12 +23,20 @@ #include <boost/cast.hpp> namespace qpid { -/// Import shared_ptr definitions into qpid namespace. + +// Import shared_ptr definitions into qpid namespace and define some +// useful shared_ptr templates for convenience. + using boost::shared_ptr; using boost::dynamic_pointer_cast; using boost::static_pointer_cast; using boost::const_pointer_cast; using boost::shared_polymorphic_downcast; + +template <class T> shared_ptr<T> make_shared_ptr(T* ptr) { + return shared_ptr<T>(ptr); +} + } // namespace qpid diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index d5d1005aa9..8dc3e4b432 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -27,6 +27,7 @@ #include "../client/ClientExchange.h" #include "../client/MessageListener.h" #include "../client/BasicMessageChannel.h" +#include "../client/MessageMessageChannel.h" using namespace std; using namespace boost; @@ -203,7 +204,17 @@ class BasicClientChannelTest : public ClientChannelTestBase { } }; +class MessageClientChannelTest : public ClientChannelTestBase { + CPPUNIT_TEST_SUITE(MessageClientChannelTest); + CPPUNIT_TEST(testPublishGet); + CPPUNIT_TEST_SUITE_END(); + public: + MessageClientChannelTest() { + channel.reset(new Channel(false, 500, Channel::AMQP_09)); + } +}; // Make this test suite a plugin. CPPUNIT_PLUGIN_IMPLEMENT(); CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageClientChannelTest); diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 954c378c37..aa7cd90bc2 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -18,9 +18,6 @@ * under the License. * */ -#include <memory> -#include <boost/lexical_cast.hpp> - #include "ConnectionRedirectBody.h" #include "../framing/ProtocolVersion.h" #include "../framing/amqp_framing.h" @@ -38,6 +35,11 @@ #include "../client/Connection.h" #include "../client/ClientExchange.h" #include "../client/ClientQueue.h" +#include "../framing/Correlator.h" +#include "BasicGetOkBody.h" +#include <memory> +#include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> using namespace qpid; using namespace qpid::framing; @@ -65,6 +67,7 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_TEST(testResponseBodyFrame); CPPUNIT_TEST(testRequester); CPPUNIT_TEST(testResponder); + CPPUNIT_TEST(testCorrelator); CPPUNIT_TEST(testInlineContent); CPPUNIT_TEST(testContentReference); CPPUNIT_TEST(testContentValidation); @@ -300,7 +303,7 @@ class FramingTest : public CppUnit::TestCase Responder r; AMQRequestBody::Data q; AMQResponseBody::Data p; - + q.requestId = 1; q.responseMark = 0; r.received(q); @@ -335,6 +338,48 @@ class FramingTest : public CppUnit::TestCase } + + std::vector<Correlator::ResponsePtr> correlations; + + void correlatorCallback(Correlator::ResponsePtr r) { + correlations.push_back(r); + } + + struct DummyResponse : public AMQResponseBody { + DummyResponse(ResponseId id=0, RequestId req=0, BatchOffset off=0) + : AMQResponseBody(version, id, req, off) {} + uint32_t size() const { return 0; } + void print(std::ostream&) const {} + MethodId amqpMethodId() const { return 0; } + ClassId amqpClassId() const { return 0; } + void encodeContent(Buffer& ) const {} + void decodeContent(Buffer& ) {} + }; + + void testCorrelator() { + CPPUNIT_ASSERT(correlations.empty()); + Correlator c; + Correlator::Action action = boost::bind(&FramingTest::correlatorCallback, this, _1); + c.request(5, action); + Correlator::ResponsePtr r1(new DummyResponse(3, 5, 0)); + CPPUNIT_ASSERT(c.response(r1)); + CPPUNIT_ASSERT_EQUAL(size_t(1), correlations.size()); + CPPUNIT_ASSERT(correlations.front() == r1); + correlations.clear(); + + c.request(6, action); + c.request(7, action); + c.request(8, action); + Correlator::ResponsePtr r2(new DummyResponse(4, 6, 3)); + CPPUNIT_ASSERT(c.response(r2)); + CPPUNIT_ASSERT_EQUAL(size_t(3), correlations.size()); + CPPUNIT_ASSERT(r2 == correlations[0]); + CPPUNIT_ASSERT(r2 == correlations[1]); + CPPUNIT_ASSERT(r2 == correlations[2]); + Correlator::ResponsePtr r3(new DummyResponse(5, 99, 0)); + CPPUNIT_ASSERT(!c.response(r3)); + } + // expect may contain null chars so use string(ptr,size) constructor // Use sizeof(expect)-1 to strip the trailing null. #define ASSERT_FRAME(expect, frame) \ diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 0dc6c3343e..a5d1fdbab5 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -110,6 +110,9 @@ gen.mk: Makefile.am check: .valgrindrc $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker) +check-unit: + $(MAKE) check TESTS=run-unit-tests + # Create a copy so user can modify without risk of checking in their mods. .valgrindrc: .valgrindrc-default cp .valgrindrc-default .valgrindrc |