diff options
28 files changed, 612 insertions, 721 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 91ba3dfec0..fc61cd2296 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -49,14 +49,6 @@ BasicMessage::BasicMessage( size(0) {} -// FIXME aconway 2007-02-01: remove. -// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) : -// publisher(0), size(0) -// { - -// decode(buffer, headersOnly, contentChunkSize); -// } - // For tests only. BasicMessage::BasicMessage() : size(0) {} @@ -227,12 +219,13 @@ void BasicMessage::releaseContent(MessageStore* store) store->stage(this); } if (!content.get() || content->size() > 0) { - // FIXME aconway 2007-02-07: handle MessageMessage. - //set content to lazy loading mode (but only if there is stored content): + //set content to lazy loading mode (but only if there is + //stored content): - //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is - // then set as a member of that message so its lifetime is guaranteed to be no longer than - // that of the message itself + //Note: the LazyLoadedContent instance contains a raw pointer + //to the message, however it is then set as a member of that + //message so its lifetime is guaranteed to be no longer than + //that of the message itself content = std::auto_ptr<Content>( new LazyLoadedContent(store, this, expectedContentSize())); } diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 7739ab19e0..4989cccdd3 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -110,10 +110,6 @@ class Message { virtual bool isComplete() = 0; virtual uint64_t contentSize() const = 0; - // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties - // at this level. Expose only generic properties available from both - // message types (e.g. getApplicationHeaders below). - // virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; virtual const framing::FieldTable& getApplicationHeaders() = 0; virtual bool isPersistent() = 0; diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 31309bd6c5..b65e8e3a9a 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -234,10 +234,9 @@ void Queue::create(const FieldTable& settings) void Queue::configure(const FieldTable& settings) { - QueuePolicy* _policy = new QueuePolicy(settings); - if (_policy->getMaxCount() || _policy->getMaxSize()) { - setPolicy(std::auto_ptr<QueuePolicy>(_policy)); - } + std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(settings)); + if (_policy->getMaxCount() || _policy->getMaxSize()) + setPolicy(_policy); } void Queue::destroy() diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp index 3e4ac29486..237375e860 100644 --- a/cpp/lib/broker/InMemoryContent.cpp +++ b/cpp/lib/broker/InMemoryContent.cpp @@ -40,7 +40,6 @@ uint32_t InMemoryContent::size() return sum; } -// FIXME aconway 2007-02-01: Remove version parameter. void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) { for (content_iterator i = content.begin(); i != content.end(); i++) { diff --git a/cpp/lib/client/BasicMessageChannel.cpp b/cpp/lib/client/BasicMessageChannel.cpp index 012a55b9ea..d6929965ee 100644 --- a/cpp/lib/client/BasicMessageChannel.cpp +++ b/cpp/lib/client/BasicMessageChannel.cpp @@ -15,7 +15,6 @@ * limitations under the License. * */ -#include <iostream> #include "BasicMessageChannel.h" #include "AMQMethodBody.h" #include "ClientChannel.h" @@ -23,39 +22,93 @@ #include "MessageListener.h" #include "framing/FieldTable.h" #include "Connection.h" - -using namespace std; +#include <queue> +#include <iostream> +#include <boost/format.hpp> +#include <boost/variant.hpp> namespace qpid { namespace client { +using namespace std; using namespace sys; using namespace framing; +using boost::format; + +namespace { + +// Destination name constants +const std::string BASIC_GET("__basic_get__"); +const std::string BASIC_RETURN("__basic_return__"); + +// Reference name constant +const std::string BASIC_REF("__basic_reference__"); +} + +class BasicMessageChannel::WaitableDestination : + public IncomingMessage::Destination +{ + public: + WaitableDestination() : shutdownFlag(false) {} + void message(const Message& msg) { + Mutex::ScopedLock l(monitor); + queue.push(msg); + monitor.notify(); + } + + void empty() { + Mutex::ScopedLock l(monitor); + queue.push(Empty()); + monitor.notify(); + } + + bool wait(Message& msgOut) { + Mutex::ScopedLock l(monitor); + while (queue.empty() && !shutdownFlag) + monitor.wait(); + if (shutdownFlag) + return false; + Message* msg = boost::get<Message>(&queue.front()); + bool success = msg; + if (success) + msgOut=*msg; + queue.pop(); + if (!queue.empty()) + monitor.notify(); // Wake another waiter. + return success; + } + + void shutdown() { + Mutex::ScopedLock l(monitor); + shutdownFlag = true; + monitor.notifyAll(); + } + + private: + struct Empty {}; + typedef boost::variant<Message,Empty> Item; + sys::Monitor monitor; + std::queue<Item> queue; + bool shutdownFlag; +}; + BasicMessageChannel::BasicMessageChannel(Channel& ch) - : channel(ch), returnsHandler(0) {} + : channel(ch), returnsHandler(0), + destGet(new WaitableDestination()), + destDispatch(new WaitableDestination()) +{ + incoming.addDestination(BASIC_RETURN, *destDispatch); +} void BasicMessageChannel::consume( Queue& queue, std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - channel.sendAndReceiveSync<BasicConsumeOkBody>( - synch, - new BasicConsumeBody( - channel.version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if (synch) { - BasicConsumeOkBody::shared_ptr response = - boost::shared_polymorphic_downcast<BasicConsumeOkBody>( - channel.responses.getResponse()); - tag = response->getConsumerTag(); - } - // FIXME aconway 2007-02-20: Race condition! - // We could receive the first message for the consumer - // before we create the consumer below. - // Move consumer creation to handler for BasicConsumeOkBody { + // Note we create a consumer even if tag="". In that case + // It will be renamed when we handle BasicConsumeOkBody. + // Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i != consumers.end()) @@ -66,6 +119,23 @@ void BasicMessageChannel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } + + // FIXME aconway 2007-03-23: get processed in both. + + // BasicConsumeOkBody is really processed in handle(), here + // we just pick up the tag to return to the user. + // + // We can't process it here because messages for the consumer may + // already be arriving. + // + BasicConsumeOkBody::shared_ptr ok = + channel.sendAndReceiveSync<BasicConsumeOkBody>( + synch, + new BasicConsumeBody( + channel.version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable())); + tag = ok->getConsumerTag(); } @@ -92,6 +162,8 @@ void BasicMessageChannel::close(){ consumersCopy = consumers; consumers.clear(); } + destGet->shutdown(); + destDispatch->shutdown(); for (ConsumerMap::iterator i=consumersCopy.begin(); i != consumersCopy.end(); ++i) { @@ -102,28 +174,37 @@ void BasicMessageChannel::close(){ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); } } - incoming.shutdown(); } - -bool BasicMessageChannel::get(Message& msg, const Queue& queue, AckMode ackMode) { - // Expect a message starting with a BasicGetOk - incoming.startGet(); - channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - return incoming.waitGet(msg); +bool BasicMessageChannel::get( + Message& msg, const Queue& queue, AckMode ackMode) +{ + // Prepare for incoming response + incoming.addDestination(BASIC_GET, *destGet); + channel.send( + new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); + bool got = destGet->wait(msg); + return got; } void BasicMessageChannel::publish( const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) { - msg.getHeader()->setContentSize(msg.getData().size()); const string e = exchange.getName(); string key = routingKey; - channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)); - //break msg up into header frame and content frame(s) and send these - channel.send(msg.getHeader()); + + // Make a header for the message + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + BasicHeaderProperties::copy( + *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); + header->setContentSize(msg.getData().size()); + + channel.send( + new BasicPublishBody( + channel.version, 0, e, key, mandatory, immediate)); + channel.send(header); string data = msg.getData(); u_int64_t data_length = data.length(); if(data_length > 0){ @@ -149,22 +230,76 @@ void BasicMessageChannel::publish( void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); switch(method->amqpMethodId()) { - case BasicDeliverBody::METHOD_ID: - case BasicReturnBody::METHOD_ID: - case BasicGetOkBody::METHOD_ID: - case BasicGetEmptyBody::METHOD_ID: - incoming.add(method); - return; + case BasicGetOkBody::METHOD_ID: { + incoming.openReference(BASIC_REF); + incoming.createMessage(BASIC_GET, BASIC_REF); + return; + } + case BasicGetEmptyBody::METHOD_ID: { + incoming.getDestination(BASIC_GET).empty(); + incoming.removeDestination(BASIC_GET); + return; + } + case BasicDeliverBody::METHOD_ID: { + BasicDeliverBody::shared_ptr deliver= + boost::shared_polymorphic_downcast<BasicDeliverBody>(method); + incoming.openReference(BASIC_REF); + Message& msg = incoming.createMessage( + deliver->getConsumerTag(), BASIC_REF); + msg.setDestination(deliver->getConsumerTag()); + msg.setDeliveryTag(deliver->getDeliveryTag()); + msg.setRedelivered(deliver->getRedelivered()); + return; + } + case BasicReturnBody::METHOD_ID: { + incoming.openReference(BASIC_REF); + incoming.createMessage(BASIC_RETURN, BASIC_REF); + return; + } + case BasicConsumeOkBody::METHOD_ID: { + Mutex::ScopedLock l(lock); + BasicConsumeOkBody::shared_ptr consumeOk = + boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method); + std::string tag = consumeOk->getConsumerTag(); + ConsumerMap::iterator i = consumers.find(std::string()); + if (i != consumers.end()) { + // Need to rename the un-named consumer. + if (consumers.find(tag) == consumers.end()) { + consumers[tag] = i->second; + consumers.erase(i); + } + else // Tag already exists. + throw ChannelException(404, "Tag already exists: "+tag); + } + // FIXME aconway 2007-03-23: Integrate consumer & destination + // maps. + incoming.addDestination(tag, *destDispatch); + return; + } } throw Channel::UnknownMethod(); } -void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr body){ - incoming.add(body); +void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) { + BasicHeaderProperties* props = + boost::polymorphic_downcast<BasicHeaderProperties*>( + header->getProperties()); + IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF); + assert (ref.messages.size() == 1); + ref.messages.front().BasicHeaderProperties::operator=(*props); + incoming_size = header->getContentSize(); + if (incoming_size==0) + incoming.closeReference(BASIC_REF); } -void BasicMessageChannel::handle(AMQContentBody::shared_ptr body){ - incoming.add(body); +void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){ + incoming.appendReference(BASIC_REF, content->getData()); + size_t size = incoming.getReference(BASIC_REF).data.size(); + if (size >= incoming_size) { + incoming.closeReference(BASIC_REF); + if (size > incoming_size) + throw ChannelException(502, "Content exceeded declared size"); + } } void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ @@ -186,7 +321,9 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ consumer.lastDeliveryTag = 0; channel.send( new BasicAckBody( - channel.version, msg.getDeliveryTag(), multiple)); + channel.version, + msg.getDeliveryTag(), + multiple)); case NO_ACK: // Nothing to do case CLIENT_ACK: // User code must ack. break; @@ -204,35 +341,32 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ void BasicMessageChannel::run() { while(channel.isOpen()) { try { - Message msg = incoming.waitDispatch(); - if(msg.getMethod()->isA<BasicReturnBody>()) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler == 0) { - // TODO aconway 2007-02-20: proper logging. - cout << "Message returned: " << msg.getData() << endl; + Message msg; + bool gotMessge = destDispatch->wait(msg); + if (gotMessge) { + if(msg.getDestination() == BASIC_RETURN) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; + } + if(handler != 0) + handler->returned(msg); } - else - handler->returned(msg); - } - else { - BasicDeliverBody::shared_ptr deliverBody = - boost::shared_polymorphic_downcast<BasicDeliverBody>( - msg.getMethod()); - std::string tag = deliverBody->getConsumerTag(); - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + tag); - consumer = i->second; + else { + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find( + msg.getDestination()); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + + msg.getDestination()); + consumer = i->second; + } + deliver(consumer, msg); } - deliver(consumer, msg); } } catch (const ShutdownException&) { @@ -240,8 +374,8 @@ void BasicMessageChannel::run() { } catch (const Exception& e) { // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::Basic::run() terminated by: " << e.toString() - << "(" << typeid(e).name() << ")" << endl; + cout << "client::BasicMessageChannel::run() terminated by: " + << e.toString() << endl; } } } diff --git a/cpp/lib/client/BasicMessageChannel.h b/cpp/lib/client/BasicMessageChannel.h index b921ec24d9..aaedfd6bf1 100644 --- a/cpp/lib/client/BasicMessageChannel.h +++ b/cpp/lib/client/BasicMessageChannel.h @@ -21,6 +21,7 @@ #include "MessageChannel.h" #include "IncomingMessage.h" +#include <boost/scoped_ptr.hpp> namespace qpid { namespace client { @@ -62,13 +63,13 @@ class BasicMessageChannel : public MessageChannel private: + class WaitableDestination; struct Consumer{ MessageListener* listener; AckMode ackMode; int count; u_int64_t lastDeliveryTag; }; - typedef std::map<std::string, Consumer> ConsumerMap; void deliver(Consumer& consumer, Message& msg); @@ -76,8 +77,11 @@ class BasicMessageChannel : public MessageChannel sys::Mutex lock; Channel& channel; IncomingMessage incoming; - ConsumerMap consumers; + uint64_t incoming_size; + ConsumerMap consumers ; ReturnedMessageHandler* returnsHandler; + boost::scoped_ptr<WaitableDestination> destGet; + boost::scoped_ptr<WaitableDestination> destDispatch; }; }} // namespace qpid::client diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 97e0a394d2..98feff9389 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -68,7 +68,8 @@ void Channel::protocolInit( assert(connection); responses.expect(); connection->connector->init(); // Send ProtocolInit block. - responses.receive<ConnectionStartBody>(); + ConnectionStartBody::shared_ptr connectionStart = + responses.receive<ConnectionStartBody>(); FieldTable props; string mechanism("PLAIN"); @@ -77,7 +78,8 @@ void Channel::protocolInit( ConnectionTuneBody::shared_ptr proposal = sendAndReceive<ConnectionTuneBody>( new ConnectionStartOkBody( - version, responses.getRequestId(), props, mechanism, + version, connectionStart->getRequestId(), + props, mechanism, response, locale)); /** @@ -89,7 +91,8 @@ void Channel::protocolInit( **/ send(new ConnectionTuneOkBody( - version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(), + version, proposal->getRequestId(), + proposal->getChannelMax(), connection->getMaxFrameSize(), proposal->getHeartbeat())); uint16_t heartbeat = proposal->getHeartbeat(); @@ -102,18 +105,17 @@ void Channel::protocolInit( send(new ConnectionOpenBody(version, vhost, capabilities, true)); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). - responses.waitForResponse(); - if(responses.validate<ConnectionOpenOkBody>()) { + AMQMethodBody::shared_ptr openResponse = responses.receive(); + if(openResponse->isA<ConnectionOpenOkBody>()) { //ok - }else if(responses.validate<ConnectionRedirectBody>()){ + }else if(openResponse->isA<ConnectionRedirectBody>()){ //ignore for now ConnectionRedirectBody::shared_ptr redirect( - shared_polymorphic_downcast<ConnectionRedirectBody>( - responses.getResponse())); + shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse)); cout << "Received redirection to " << redirect->getHost() << endl; } else { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open"); } } @@ -121,7 +123,6 @@ bool Channel::isOpen() const { return connection; } void Channel::setQos() { messaging->setQos(); - // FIXME aconway 2007-02-22: message } void Channel::setPrefetch(uint16_t _prefetch){ @@ -149,18 +150,15 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){ void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - sendAndReceiveSync<QueueDeclareOkBody>( - synch, - new QueueDeclareBody( - version, 0, name, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args)); - if(synch){ - if(queue.getName().length() == 0){ - QueueDeclareOkBody::shared_ptr response = - shared_polymorphic_downcast<QueueDeclareOkBody>( - responses.getResponse()); + QueueDeclareOkBody::shared_ptr response = + sendAndReceiveSync<QueueDeclareOkBody>( + synch, + new QueueDeclareBody( + version, 0, name, false/*passive*/, queue.isDurable(), + queue.isExclusive(), queue.isAutoDelete(), !synch, args)); + if(synch) { + if(queue.getName().length() == 0) queue.setName(response->getQueue()); - } } } @@ -191,6 +189,14 @@ void Channel::rollback(){ void Channel::handleMethodInContext( AMQMethodBody::shared_ptr method, const MethodContext&) { + // TODO aconway 2007-03-23: Special case for consume OK as it + // is both an expected response and needs handling in this thread. + // Need to review & reationalize the client-side processing model. + if (method->isA<BasicConsumeOkBody>()) { + messaging->handle(method); + responses.signalResponse(method); + return; + } if(responses.isWaiting()) { responses.signalResponse(method); return; @@ -204,11 +210,11 @@ void Channel::handleMethodInContext( } } catch (const UnknownMethod&) { - connection->close( - 504, "Unknown method", - method->amqpClassId(), method->amqpMethodId()); - } -} + connection->close( + 504, "Unknown method", + method->amqpClassId(), method->amqpMethodId()); + } + } void Channel::handleChannel(AMQMethodBody::shared_ptr method) { switch (method->amqpMethodId()) { @@ -272,8 +278,6 @@ void Channel::close( void Channel::peerClose(ChannelCloseBody::shared_ptr) { assert(isOpen()); closeInternal(); - // FIXME aconway 2007-01-26: How to throw the proper exception - // to the application thread? } void Channel::closeInternal() { @@ -287,20 +291,23 @@ void Channel::closeInternal() { dispatcher.join(); } -void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) +AMQMethodBody::shared_ptr Channel::sendAndReceive( + AMQMethodBody* toSend, ClassId c, MethodId m) { responses.expect(); send(toSend); - responses.receive(c, m); + return responses.receive(c, m); } -void Channel::sendAndReceiveSync( +AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( bool sync, AMQMethodBody* body, ClassId c, MethodId m) { if(sync) - sendAndReceive(body, c, m); - else + return sendAndReceive(body, c, m); + else { send(body); + return AMQMethodBody::shared_ptr(); + } } void Channel::consume( diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 58a007977d..f6fe18da20 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -81,24 +81,25 @@ class Channel : public framing::ChannelAdapter const std::string& uid, const std::string& pwd, const std::string& vhost); - void sendAndReceive( + framing::AMQMethodBody::shared_ptr sendAndReceive( framing::AMQMethodBody*, framing::ClassId, framing::MethodId); - void sendAndReceiveSync( + framing::AMQMethodBody::shared_ptr sendAndReceiveSync( bool sync, framing::AMQMethodBody*, framing::ClassId, framing::MethodId); template <class BodyType> boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { - sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID); return boost::shared_polymorphic_downcast<BodyType>( - responses.getResponse()); + sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } - template <class BodyType> void sendAndReceiveSync( + template <class BodyType> + boost::shared_ptr<BodyType> sendAndReceiveSync( bool sync, framing::AMQMethodBody* body) { - sendAndReceiveSync( - sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID); + return boost::shared_polymorphic_downcast<BodyType>( + sendAndReceiveSync( + sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } void open(framing::ChannelId, Connection&); diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp deleted file mode 100644 index 8edd0a474d..0000000000 --- a/cpp/lib/client/ClientMessage.cpp +++ /dev/null @@ -1,161 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientMessage.h> -using namespace qpid::client; -using namespace qpid::framing; - -Message::Message(const std::string& d) - : header(new AMQHeaderBody(BASIC)) -{ - setData(d); -} - -void Message::setData(const std::string& d) { - data = d; -} - -Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ -} - -Message::~Message(){ -} - -BasicHeaderProperties* Message::getHeaderProperties() const { - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -std::string Message::getContentType() const { - return getHeaderProperties()->getContentType(); -} - -std::string Message::getContentEncoding() const { - return getHeaderProperties()->getContentEncoding(); -} - -FieldTable& Message::getHeaders() const { - return getHeaderProperties()->getHeaders(); -} - -uint8_t Message::getDeliveryMode() const { - return getHeaderProperties()->getDeliveryMode(); -} - -uint8_t Message::getPriority() const { - return getHeaderProperties()->getPriority(); -} - -std::string Message::getCorrelationId() const { - return getHeaderProperties()->getCorrelationId(); -} - -std::string Message::getReplyTo() const { - return getHeaderProperties()->getReplyTo(); -} - -std::string Message::getExpiration() const { - return getHeaderProperties()->getExpiration(); -} - -std::string Message::getMessageId() const { - return getHeaderProperties()->getMessageId(); -} - -uint64_t Message::getTimestamp() const { - return getHeaderProperties()->getTimestamp(); -} - -std::string Message::getType() const { - return getHeaderProperties()->getType(); -} - -std::string Message::getUserId() const { - return getHeaderProperties()->getUserId(); -} - -std::string Message::getAppId() const { - return getHeaderProperties()->getAppId(); -} - -std::string Message::getClusterId() const { - return getHeaderProperties()->getClusterId(); -} - -void Message::setContentType(const std::string& type){ - getHeaderProperties()->setContentType(type); -} - -void Message::setContentEncoding(const std::string& encoding){ - getHeaderProperties()->setContentEncoding(encoding); -} - -void Message::setHeaders(const FieldTable& headers){ - getHeaderProperties()->setHeaders(headers); -} - -void Message::setDeliveryMode(DeliveryMode mode){ - getHeaderProperties()->setDeliveryMode(mode); -} - -void Message::setPriority(uint8_t priority){ - getHeaderProperties()->setPriority(priority); -} - -void Message::setCorrelationId(const std::string& correlationId){ - getHeaderProperties()->setCorrelationId(correlationId); -} - -void Message::setReplyTo(const std::string& replyTo){ - getHeaderProperties()->setReplyTo(replyTo); -} - -void Message::setExpiration(const std::string& expiration){ - getHeaderProperties()->setExpiration(expiration); -} - -void Message::setMessageId(const std::string& messageId){ - getHeaderProperties()->setMessageId(messageId); -} - -void Message::setTimestamp(uint64_t timestamp){ - getHeaderProperties()->setTimestamp(timestamp); -} - -void Message::setType(const std::string& type){ - getHeaderProperties()->setType(type); -} - -void Message::setUserId(const std::string& userId){ - getHeaderProperties()->setUserId(userId); -} - -void Message::setAppId(const std::string& appId){ - getHeaderProperties()->setAppId(appId); -} - -void Message::setClusterId(const std::string& clusterId){ - getHeaderProperties()->setClusterId(clusterId); -} - - -uint64_t Message::getDeliveryTag() const { - BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get()); - return deliver ? deliver->getDeliveryTag() : 0; -} diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h index c89eeb1a0d..a326c24aed 100644 --- a/cpp/lib/client/ClientMessage.h +++ b/cpp/lib/client/ClientMessage.h @@ -22,12 +22,10 @@ * */ #include <string> -#include <framing/amqp_framing.h> +#include "framing/BasicHeaderProperties.h" namespace qpid { - namespace client { -class IncomingMessage; /** * A representation of messages for sent or recived through the @@ -35,67 +33,28 @@ class IncomingMessage; * * \ingroup clientapi */ -class Message { - framing::AMQMethodBody::shared_ptr method; - framing::AMQHeaderBody::shared_ptr header; - std::string data; - bool redelivered; - - // FIXME aconway 2007-02-20: const incorrect, needs const return type. - framing::BasicHeaderProperties* getHeaderProperties() const; - Message(qpid::framing::AMQHeaderBody::shared_ptr& header); - +class Message : public framing::BasicHeaderProperties { public: - enum DeliveryMode { DURABLE=1, NON_DURABLE=2 }; - Message(const std::string& data=std::string()); - ~Message(); + Message(const std::string& data_=std::string()) : data(data_) {} std::string getData() const { return data; } - bool isRedelivered() const { return redelivered; } - uint64_t getDeliveryTag() const; - std::string getContentType() const; - std::string getContentEncoding() const; - qpid::framing::FieldTable& getHeaders() const; - uint8_t getDeliveryMode() const; - uint8_t getPriority() const; - std::string getCorrelationId() const; - std::string getReplyTo() const; - std::string getExpiration() const; - std::string getMessageId() const; - uint64_t getTimestamp() const; - std::string getType() const; - std::string getUserId() const; - std::string getAppId() const; - std::string getClusterId() const; + void setData(const std::string& _data) { data = _data; } - void setData(const std::string& _data); - void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - void setContentType(const std::string& type); - void setContentEncoding(const std::string& encoding); - void setHeaders(const qpid::framing::FieldTable& headers); - void setDeliveryMode(DeliveryMode mode); - void setPriority(uint8_t priority); - void setCorrelationId(const std::string& correlationId); - void setReplyTo(const std::string& replyTo); - void setExpiration(const std::string& expiration); - void setMessageId(const std::string& messageId); - void setTimestamp(uint64_t timestamp); - void setType(const std::string& type); - void setUserId(const std::string& userId); - void setAppId(const std::string& appId); - void setClusterId(const std::string& clusterId); + std::string getDestination() const { return destination; } + void setDestination(const std::string& dest) { destination = dest; } - /** Get the method used to deliver this message */ - boost::shared_ptr<framing::AMQMethodBody> getMethod() const - { return method; } - - void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; } - boost::shared_ptr<framing::AMQHeaderBody> getHeader() const - { return header; } + // TODO aconway 2007-03-22: only needed for Basic.deliver support. + uint64_t getDeliveryTag() const { return deliveryTag; } + void setDeliveryTag(uint64_t dt) { deliveryTag = dt; } - // TODO aconway 2007-02-15: remove friendships. - friend class IncomingMessage; - friend class Channel; + bool isRedelivered() const { return redelivered; } + void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + + private: + std::string data; + std::string destination; + bool redelivered; + uint64_t deliveryTag; }; }} diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp index 07f94ceb64..8f69f8c3ef 100644 --- a/cpp/lib/client/IncomingMessage.cpp +++ b/cpp/lib/client/IncomingMessage.cpp @@ -18,155 +18,113 @@ * under the License. * */ -#include <IncomingMessage.h> -#include "framing/AMQHeaderBody.h" -#include "framing/AMQContentBody.h" -#include "BasicGetOkBody.h" -#include "BasicReturnBody.h" -#include "BasicDeliverBody.h" -#include <QpidError.h> -#include <iostream> + +#include "IncomingMessage.h" +#include "Exception.h" +#include "ClientMessage.h" +#include <boost/format.hpp> namespace qpid { namespace client { -using namespace sys; -using namespace framing; - -struct IncomingMessage::Guard: public Mutex::ScopedLock { - Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { - im->shutdownError.throwIf(); - } -}; - -IncomingMessage::IncomingMessage() { reset(); } +using boost::format; +using sys::Mutex; -void IncomingMessage::reset() { - state = &IncomingMessage::expectRequest; - endFn= &IncomingMessage::endRequest; - buildMessage = Message(); -} - -void IncomingMessage::startGet() { - Guard g(this); - if (state != &IncomingMessage::expectRequest) { - endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); - } - else { - state = &IncomingMessage::expectGetOk; - endFn = &IncomingMessage::endGet; - getError.reset(); - getState = GETTING; - } -} - -bool IncomingMessage::waitGet(Message& msg) { - Guard g(this); - while (getState == GETTING && !shutdownError && !getError) - getReady.wait(lock); - shutdownError.throwIf(); - getError.throwIf(); - msg = getMessage; - return getState==GOT; -} - -Message IncomingMessage::waitDispatch() { - Guard g(this); - while(dispatchQueue.empty() && !shutdownError) - dispatchReady.wait(lock); - shutdownError.throwIf(); - - Message msg(dispatchQueue.front()); - dispatchQueue.pop(); - return msg; -} +IncomingMessage::Destination::~Destination() {} -void IncomingMessage::add(BodyPtr body) { - Guard g(this); - shutdownError.throwIf(); - // Call the current state function. - (this->*state)(body); -} - -void IncomingMessage::shutdown() { +void IncomingMessage::openReference(const std::string& name) { Mutex::ScopedLock l(lock); - shutdownError.reset(new ShutdownException()); - getReady.notify(); - dispatchReady.notify(); + if (references.find(name) != references.end()) + throw ChannelException( + 406, format("Attempt to open existing reference %s.") % name); + references[name]; + return; } -bool IncomingMessage::isShutdown() const { +void IncomingMessage::appendReference( + const std::string& name, const std::string& data) +{ Mutex::ScopedLock l(lock); - return shutdownError; + getRefUnlocked(name).data += data; } -// Common check for all the expect functions. Called in network thread. -template<class T> -boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { - boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); - if (!ptr) - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); - return ptr; +Message& IncomingMessage::createMessage( + const std::string& destination, const std::string& reference) +{ + Mutex::ScopedLock l(lock); + getDestUnlocked(destination); // Verify destination. + Reference& ref = getRefUnlocked(reference); + ref.messages.resize(ref.messages.size() +1); + ref.messages.back().setDestination(destination); + return ref.messages.back(); } -void IncomingMessage::expectGetOk(BodyPtr body) { - if (dynamic_cast<BasicGetOkBody*>(body.get())) - state = &IncomingMessage::expectHeader; - else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { - getState = EMPTY; - endGet(); +void IncomingMessage::closeReference(const std::string& name) { + Reference refCopy; + { + Mutex::ScopedLock l(lock); + refCopy = getRefUnlocked(name); + references.erase(name); + } + for (std::vector<Message>::iterator i = refCopy.messages.begin(); + i != refCopy.messages.end(); + ++i) + { + i->setData(refCopy.data); + // TODO aconway 2007-03-23: Thread safety, + // can a destination be removed while we're doing this? + getDestination(i->getDestination()).message(*i); } - else - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); } -void IncomingMessage::expectHeader(BodyPtr body) { - AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); - buildMessage.header = header; - state = &IncomingMessage::expectContent; - checkComplete(); + +void IncomingMessage::addDestination(std::string name, Destination& dest) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + destinations[name]=&dest; + else if (i->second != &dest) + throw ChannelException( + 404, format("Destination already exists: %s.") % name); } -void IncomingMessage::expectContent(BodyPtr body) { - AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); - buildMessage.setData(buildMessage.getData() + content->getData()); - checkComplete(); +void IncomingMessage::removeDestination(std::string name) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + throw ChannelException( + 406, format("No such destination: %s.") % name); + destinations.erase(i); } -void IncomingMessage::checkComplete() { - size_t declaredSize = buildMessage.header->getContentSize(); - size_t currentSize = buildMessage.getData().size(); - if (declaredSize == currentSize) - (this->*endFn)(0); - else if (declaredSize < currentSize) - (this->*endFn)(new QPID_ERROR( - PROTOCOL_ERROR, "Message content exceeds declared size.")); +IncomingMessage::Destination& IncomingMessage::getDestination( + const std::string& name) { + return getDestUnlocked(name); } -void IncomingMessage::expectRequest(BodyPtr body) { - AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); - buildMessage.setMethod(method); - state = &IncomingMessage::expectHeader; +IncomingMessage::Reference& IncomingMessage::getReference( + const std::string& name) { + return getRefUnlocked(name); } - -void IncomingMessage::endGet(Exception* ex) { - getError.reset(ex); - if (getState == GETTING) { - getMessage = buildMessage; - getState = GOT; - } - reset(); - getReady.notify(); + +IncomingMessage::Reference& IncomingMessage::getRefUnlocked( + const std::string& name) { + Mutex::ScopedLock l(lock); + ReferenceMap::iterator i = references.find(name); + if (i == references.end()) + throw ChannelException( + 404, format("No such reference: %s.") % name); + return i->second; } -void IncomingMessage::endRequest(Exception* ex) { - ExceptionHolder eh(ex); - if (!eh) { - dispatchQueue.push(buildMessage); - reset(); - dispatchReady.notify(); - } - eh.throwIf(); +IncomingMessage::Destination& IncomingMessage::getDestUnlocked( + const std::string& name) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + throw ChannelException( + 404, format("No such destination: %s.") % name); + return *i->second; } }} // namespace qpid::client diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h index 6ec949028d..d78a90327d 100644 --- a/cpp/lib/client/IncomingMessage.h +++ b/cpp/lib/client/IncomingMessage.h @@ -21,96 +21,91 @@ * under the License. * */ -#include <string> -#include <queue> -#include <framing/amqp_framing.h> -#include "ExceptionHolder.h" -#include "ClientMessage.h" #include "sys/Mutex.h" -#include "sys/Condition.h" +#include <map> +#include <vector> + namespace qpid { +namespace client { -namespace framing { -class AMQBody; -} +class Message; -namespace client { /** - * Accumulates incoming message frames into messages. - * Client-initiated messages (basic.get) are initiated and made - * available to the user thread one at a time. - * - * Broker initiated messages (basic.return, basic.deliver) are - * queued for handling by the user dispatch thread. - * + * Manage incoming messages. + * + * Uses reference and destination concepts from 0-9 Messsage class. + * + * Basic messages use special destination and reference names to indicate + * get-ok, return etc. messages. + * */ class IncomingMessage { public: - typedef boost::shared_ptr<framing::AMQBody> BodyPtr; - IncomingMessage(); - - /** Expect a new message starting with getOk. Called in user thread.*/ - void startGet(); + /** Accumulate data associated with a set of messages. */ + struct Reference { + std::string data; + std::vector<Message> messages; + }; - /** Wait for the message to complete, return the message. - * Called in user thread. - *@raises QpidError if there was an error. - */ - bool waitGet(Message&); + /** Interface to a destination for messages. */ + class Destination { + public: + virtual ~Destination(); - /** Wait for the next broker-initiated message. */ - Message waitDispatch(); + /** Pass a message to the destination */ + virtual void message(const Message&) = 0; - /** Add a frame body to the message. Called in network thread. */ - void add(BodyPtr); + /** Notify destination of queue-empty contition */ + virtual void empty() = 0; + }; - /** Shut down: all further calls to any function throw ex. */ - void shutdown(); - /** Check if shutdown */ - bool isShutdown() const; + /** Add a reference. Throws if already open. */ + void openReference(const std::string& name); - private: + /** Get a reference. Throws if not already open. */ + void appendReference(const std::string& name, + const std::string& data); - typedef void (IncomingMessage::* ExpectFn)(BodyPtr); - typedef void (IncomingMessage::* EndFn)(Exception*); - typedef std::queue<Message> MessageQueue; - struct Guard; - friend struct Guard; + /** Create a message to destination associated with reference + *@exception if destination or reference non-existent. + */ + Message& createMessage(const std::string& destination, + const std::string& reference); - void reset(); - template <class T> boost::shared_ptr<T> expectCheck(BodyPtr); + /** Get a reference. + *@exception if non-existent. + */ + Reference& getReference(const std::string& name); + + /** Close a reference and deliver all its messages. + * Throws if not open or a message has an invalid destination. + */ + void closeReference(const std::string& name); - // State functions - a state machine where each state is - // a member function that processes a frame body. - void expectGetOk(BodyPtr); - void expectHeader(BodyPtr); - void expectContent(BodyPtr); - void expectRequest(BodyPtr); + /** Add a destination. + *@exception if a different Destination is already registered + * under name. + */ + void addDestination(std::string name, Destination&); + + /** Remove a destination. Throws if does not exist */ + void removeDestination(std::string name); - // End functions. - void endGet(Exception* ex = 0); - void endRequest(Exception* ex); + /** Get a destination. Throws if does not exist */ + Destination& getDestination(const std::string& name); + private: - // Check for complete message. - void checkComplete(); + typedef std::map<std::string, Reference> ReferenceMap; + typedef std::map<std::string, Destination*> DestinationMap; + Reference& getRefUnlocked(const std::string& name); + Destination& getDestUnlocked(const std::string& name); + mutable sys::Mutex lock; - ExpectFn state; - EndFn endFn; - Message buildMessage; - ExceptionHolder shutdownError; - - // For basic.get messages. - sys::Condition getReady; - ExceptionHolder getError; - Message getMessage; - enum { GETTING, GOT, EMPTY } getState; - - // For broker-initiated messages - sys::Condition dispatchReady; - MessageQueue dispatchQueue; + ReferenceMap references; + DestinationMap destinations; }; }} diff --git a/cpp/lib/client/Makefile.am b/cpp/lib/client/Makefile.am index 46d8775072..6dbe195232 100644 --- a/cpp/lib/client/Makefile.am +++ b/cpp/lib/client/Makefile.am @@ -12,7 +12,6 @@ libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) libqpidclient_la_SOURCES = \ ClientChannel.cpp \ ClientExchange.cpp \ - ClientMessage.cpp \ ClientQueue.cpp \ BasicMessageChannel.cpp \ Connection.cpp \ diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index 4498de41ae..926a9ce336 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -18,12 +18,10 @@ * under the License. * */ -#include <boost/format.hpp> - -#include <ResponseHandler.h> -#include <sys/Monitor.h> #include <QpidError.h> -#include "amqp_types.h" +#include <boost/format.hpp> +#include "ResponseHandler.h" +#include "AMQMethodBody.h" using namespace qpid::sys; using namespace qpid::framing; @@ -31,56 +29,51 @@ using namespace qpid::framing; namespace qpid { namespace client { -ResponseHandler::ResponseHandler() : waiting(false){} +ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {} ResponseHandler::~ResponseHandler(){} -bool ResponseHandler::validate(ClassId c, MethodId m) { - return response != 0 && - response->amqpClassId() ==c && response->amqpMethodId() == m; +bool ResponseHandler::isWaiting() { + Monitor::ScopedLock l(monitor); + return waiting; } -void ResponseHandler::waitForResponse(){ +void ResponseHandler::expect(){ Monitor::ScopedLock l(monitor); - while (waiting) - monitor.wait(); + waiting = true; } -void ResponseHandler::signalResponse( - qpid::framing::AMQMethodBody::shared_ptr _response) +void ResponseHandler::signalResponse(MethodPtr _response) { Monitor::ScopedLock l(monitor); response = _response; + if (!response) + shutdownFlag=true; waiting = false; monitor.notify(); } -void ResponseHandler::receive(ClassId c, MethodId m) { +ResponseHandler::MethodPtr ResponseHandler::receive() { Monitor::ScopedLock l(monitor); - while (waiting) + while (!response && !shutdownFlag) monitor.wait(); - getResponse(); // Check for closed. - if(!validate(response->amqpClassId(), response->amqpMethodId())) { + if (shutdownFlag) + THROW_QPID_ERROR( + PROTOCOL_ERROR, "Channel closed unexpectedly."); + MethodPtr result = response; + response.reset(); + return result; +} + +ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) { + MethodPtr response = receive(); + if(c != response->amqpClassId() || m != response->amqpMethodId()) { THROW_QPID_ERROR( PROTOCOL_ERROR, boost::format("Expected class:method %d:%d, got %d:%d") % c % m % response->amqpClassId() % response->amqpMethodId()); } -} - -framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() { - if (!response) - THROW_QPID_ERROR( - PROTOCOL_ERROR, "Channel closed unexpectedly."); return response; } -RequestId ResponseHandler::getRequestId() { - assert(response->getRequestId()); - return response->getRequestId(); -} -void ResponseHandler::expect(){ - waiting = true; -} - }} // namespace qpid::client diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h index d28048c3d3..500166144d 100644 --- a/cpp/lib/client/ResponseHandler.h +++ b/cpp/lib/client/ResponseHandler.h @@ -18,51 +18,58 @@ * under the License. * */ -#include <string> - -#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup. +#include "shared_ptr.h" #include <sys/Monitor.h> #ifndef _ResponseHandler_ #define _ResponseHandler_ namespace qpid { + +namespace framing { +class AMQMethodBody; +} + namespace client { /** * Holds a response from the broker peer for the client. */ class ResponseHandler{ + typedef shared_ptr<framing::AMQMethodBody> MethodPtr; bool waiting; - qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor monitor; + bool shutdownFlag; + MethodPtr response; + sys::Monitor monitor; public: ResponseHandler(); ~ResponseHandler(); - - bool isWaiting(){ return waiting; } - framing::AMQMethodBody::shared_ptr getResponse(); - void waitForResponse(); - - void signalResponse(framing::AMQMethodBody::shared_ptr response); - void expect();//must be called before calling receive - bool validate(framing::ClassId, framing::MethodId); - void receive(framing::ClassId, framing::MethodId); + /** Is a response expected? */ + bool isWaiting(); - framing::RequestId getRequestId(); + /** Provide a response to the waiting thread */ + void signalResponse(MethodPtr response); - template <class BodyType> bool validate() { - return validate(BodyType::CLASS_ID, BodyType::METHOD_ID); - } - template <class BodyType> void receive() { - receive(BodyType::CLASS_ID, BodyType::METHOD_ID); + /** Indicate a message is expected. */ + void expect(); + + /** Wait for a response. */ + MethodPtr receive(); + + /** Wait for a specific response. */ + MethodPtr receive(framing::ClassId, framing::MethodId); + + /** Template version of receive returns typed pointer. */ + template <class BodyType> + shared_ptr<BodyType> receive() { + return shared_polymorphic_downcast<BodyType>( + receive(BodyType::CLASS_ID, BodyType::METHOD_ID)); } }; -} -} +}} #endif diff --git a/cpp/lib/common/framing/BasicHeaderProperties.cpp b/cpp/lib/common/framing/BasicHeaderProperties.cpp index 930ec9f4dd..d815d1e62f 100644 --- a/cpp/lib/common/framing/BasicHeaderProperties.cpp +++ b/cpp/lib/common/framing/BasicHeaderProperties.cpp @@ -22,7 +22,7 @@ //TODO: This could be easily generated from the spec -qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){} +qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){} qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} uint32_t qpid::framing::BasicHeaderProperties::size() const{ @@ -70,7 +70,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, if(flags & (1 << 15)) buffer.getShortString(contentType); if(flags & (1 << 14)) buffer.getShortString(contentEncoding); if(flags & (1 << 13)) buffer.getFieldTable(headers); - if(flags & (1 << 12)) deliveryMode = buffer.getOctet(); + if(flags & (1 << 12)) deliveryMode = DeliveryMode(buffer.getOctet()); if(flags & (1 << 11)) priority = buffer.getOctet(); if(flags & (1 << 10)) buffer.getShortString(correlationId); if(flags & (1 << 9)) buffer.getShortString(replyTo); diff --git a/cpp/lib/common/framing/BasicHeaderProperties.h b/cpp/lib/common/framing/BasicHeaderProperties.h index 316e67b82c..248014aefb 100644 --- a/cpp/lib/common/framing/BasicHeaderProperties.h +++ b/cpp/lib/common/framing/BasicHeaderProperties.h @@ -28,70 +28,88 @@ namespace qpid { namespace framing { - enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2}; - //TODO: This could be easily generated from the spec - class BasicHeaderProperties : public HeaderProperties - { - string contentType; - string contentEncoding; - FieldTable headers; - uint8_t deliveryMode; - uint8_t priority; - string correlationId; - string replyTo; - string expiration; - string messageId; - uint64_t timestamp; - string type; - string userId; - string appId; - string clusterId; - - uint16_t getFlags() const; - - public: - BasicHeaderProperties(); - virtual ~BasicHeaderProperties(); - virtual uint32_t size() const; - virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); +enum DeliveryMode { TRANSIENT = 1, PERSISTENT = 2}; - virtual uint8_t classId() { return BASIC; } +class BasicHeaderProperties : public HeaderProperties +{ + string contentType; + string contentEncoding; + FieldTable headers; + DeliveryMode deliveryMode; + uint8_t priority; + string correlationId; + string replyTo; + string expiration; + string messageId; + uint64_t timestamp; + string type; + string userId; + string appId; + string clusterId; + + uint16_t getFlags() const; - string getContentType() const { return contentType; } - string getContentEncoding() const { return contentEncoding; } - FieldTable& getHeaders() { return headers; } - uint8_t getDeliveryMode() const { return deliveryMode; } - uint8_t getPriority() const { return priority; } - string getCorrelationId() const {return correlationId; } - string getReplyTo() const { return replyTo; } - string getExpiration() const { return expiration; } - string getMessageId() const {return messageId; } - uint64_t getTimestamp() const { return timestamp; } - string getType() const { return type; } - string getUserId() const { return userId; } - string getAppId() const { return appId; } - string getClusterId() const { return clusterId; } + public: + BasicHeaderProperties(); + virtual ~BasicHeaderProperties(); + virtual uint32_t size() const; + virtual void encode(Buffer& buffer) const; + virtual void decode(Buffer& buffer, uint32_t size); - void setContentType(const string& _type){ contentType = _type; } - void setContentEncoding(const string& encoding){ contentEncoding = encoding; } - void setHeaders(const FieldTable& _headers){ headers = _headers; } - void setDeliveryMode(uint8_t mode){ deliveryMode = mode; } - void setPriority(uint8_t _priority){ priority = _priority; } - void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; } - void setReplyTo(const string& _replyTo){ replyTo = _replyTo;} - void setExpiration(const string& _expiration){ expiration = _expiration; } - void setMessageId(const string& _messageId){ messageId = _messageId; } - void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; } - void setType(const string& _type){ type = _type; } - void setUserId(const string& _userId){ userId = _userId; } - void setAppId(const string& _appId){appId = _appId; } - void setClusterId(const string& _clusterId){ clusterId = _clusterId; } - }; + virtual uint8_t classId() { return BASIC; } -} -} + string getContentType() const { return contentType; } + string getContentEncoding() const { return contentEncoding; } + FieldTable& getHeaders() { return headers; } + const FieldTable& getHeaders() const { return headers; } + DeliveryMode getDeliveryMode() const { return deliveryMode; } + uint8_t getPriority() const { return priority; } + string getCorrelationId() const {return correlationId; } + string getReplyTo() const { return replyTo; } + string getExpiration() const { return expiration; } + string getMessageId() const {return messageId; } + uint64_t getTimestamp() const { return timestamp; } + string getType() const { return type; } + string getUserId() const { return userId; } + string getAppId() const { return appId; } + string getClusterId() const { return clusterId; } + void setContentType(const string& _type){ contentType = _type; } + void setContentEncoding(const string& encoding){ contentEncoding = encoding; } + void setHeaders(const FieldTable& _headers){ headers = _headers; } + void setDeliveryMode(DeliveryMode mode){ deliveryMode = mode; } + void setPriority(uint8_t _priority){ priority = _priority; } + void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; } + void setReplyTo(const string& _replyTo){ replyTo = _replyTo;} + void setExpiration(const string& _expiration){ expiration = _expiration; } + void setMessageId(const string& _messageId){ messageId = _messageId; } + void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; } + void setType(const string& _type){ type = _type; } + void setUserId(const string& _userId){ userId = _userId; } + void setAppId(const string& _appId){appId = _appId; } + void setClusterId(const string& _clusterId){ clusterId = _clusterId; } + /** \internal + * Template to copy between types like BasicHeaderProperties. + */ + template <class T, class U> + static void copy(T& to, const U& from) { + to.setContentType(from.getContentType()); + to.setContentEncoding(from.getContentEncoding()); + to.setHeaders(from.getHeaders()); + to.setDeliveryMode(from.getDeliveryMode()); + to.setPriority(from.getPriority()); + to.setCorrelationId(from.getCorrelationId()); + to.setReplyTo(from.getReplyTo()); + to.setExpiration(from.getExpiration()); + to.setMessageId(from.getMessageId()); + to.setTimestamp(from.getTimestamp()); + to.setType(from.getType()); + to.setUserId(from.getUserId()); + to.setAppId(from.getAppId()); + to.setClusterId(from.getClusterId()); + } +}; +}} #endif diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 3493924bf6..80e4c55d7e 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -67,11 +67,6 @@ struct MethodContext RequestId getRequestId() const; }; -// FIXME aconway 2007-02-01: Method context only required on Handler -// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*) -// on AMQBody and set it during decodeing then we could get rid of the context. - - }} // namespace qpid::framing diff --git a/cpp/lib/common/shared_ptr.h b/cpp/lib/common/shared_ptr.h index 6725f7acb3..c4d547e5bb 100644 --- a/cpp/lib/common/shared_ptr.h +++ b/cpp/lib/common/shared_ptr.h @@ -20,10 +20,15 @@ */ #include <boost/shared_ptr.hpp> +#include <boost/cast.hpp> namespace qpid { -/// Import shared_ptr into qpid namespace. +/// Import shared_ptr definitions into qpid namespace. using boost::shared_ptr; +using boost::dynamic_pointer_cast; +using boost::static_pointer_cast; +using boost::const_pointer_cast; +using boost::shared_polymorphic_downcast; } // namespace qpid diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp index 3f6156f230..7a0249f666 100644 --- a/cpp/lib/common/sys/ProducerConsumer.cpp +++ b/cpp/lib/common/sys/ProducerConsumer.cpp @@ -27,12 +27,12 @@ namespace sys { // // ================ ProducerConsumer ProducerConsumer::ProducerConsumer(size_t init_items) - : items(init_items), waiters(0), stopped(false) + : items(init_items), waiters(0), shutdownFlag(false) {} -void ProducerConsumer::stop() { +void ProducerConsumer::shutdown() { Mutex::ScopedLock l(monitor); - stopped = true; + shutdownFlag = true; monitor.notifyAll(); // Wait for waiting consumers to wake up. while (waiters > 0) @@ -55,16 +55,16 @@ ProducerConsumer::Lock::Lock(ProducerConsumer& p) : pc(p), lock(p.monitor), status(INCOMPLETE) {} bool ProducerConsumer::Lock::isOk() const { - return !pc.isStopped() && status==INCOMPLETE; + return !pc.isShutdown() && status==INCOMPLETE; } void ProducerConsumer::Lock::checkOk() const { - assert(!pc.isStopped()); + assert(!pc.isShutdown()); assert(status == INCOMPLETE); } ProducerConsumer::Lock::~Lock() { - assert(status != INCOMPLETE || pc.isStopped()); + assert(status != INCOMPLETE || pc.isShutdown()); } void ProducerConsumer::Lock::confirm() { @@ -96,7 +96,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) { if (isOk()) { ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.stopped) { + while (pc.items == 0 && !pc.shutdownFlag) { pc.monitor.wait(); } } @@ -115,7 +115,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock( else { Time deadline = now() + timeout; ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.stopped) { + while (pc.items == 0 && !pc.shutdownFlag) { if (!pc.monitor.wait(deadline)) { status = TIMEOUT; return; @@ -126,9 +126,9 @@ ProducerConsumer::ConsumerLock::ConsumerLock( } ProducerConsumer::ConsumerLock::~ConsumerLock() { - if (pc.isStopped()) { + if (pc.isShutdown()) { if (pc.waiters == 0) - pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s) + pc.monitor.notifyAll(); // Notify shutdown thread(s) } else if (status==CONFIRMED) { pc.items--; diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h index 742639323b..c7f42f266d 100644 --- a/cpp/lib/common/sys/ProducerConsumer.h +++ b/cpp/lib/common/sys/ProducerConsumer.h @@ -30,7 +30,7 @@ namespace sys { * * Producers increase the number of available items, consumers reduce it. * Consumers wait till an item is available. Waiting threads can be - * woken for shutdown using stop(). + * woken for shutdown using shutdown(). * * Note: Currently implements unbounded producer-consumer, i.e. no limit * to available items, producers never block. Can be extended to support @@ -43,16 +43,16 @@ class ProducerConsumer public: ProducerConsumer(size_t init_items=0); - ~ProducerConsumer() { stop(); } + ~ProducerConsumer() { shutdown(); } /** * Wake any threads waiting for ProducerLock or ConsumerLock. *@post No threads are waiting in Producer or Consumer locks. */ - void stop(); + void shutdown(); - /** True if queue is stopped */ - bool isStopped() { return stopped; } + /** True if queue is shutdown */ + bool isShutdown() { return shutdownFlag; } /** Number of items available for consumers */ size_t available() const; @@ -76,7 +76,7 @@ class ProducerConsumer *confirm() or cancel() before the lock goes out of scope. * * false means the lock failed - timed out or the - * ProducerConsumer is stopped. You should not do anything in + * ProducerConsumer is shutdown. You should not do anything in * the scope of the lock. */ bool isOk() const; @@ -98,8 +98,8 @@ class ProducerConsumer /** True if this lock experienced a timeout */ bool isTimedOut() const { return status == TIMEOUT; } - /** True if we have been stopped */ - bool isStopped() const { return pc.isStopped(); } + /** True if we have been shutdown */ + bool isShutdown() const { return pc.isShutdown(); } ProducerConsumer& pc; @@ -141,7 +141,7 @@ class ProducerConsumer * Wait up to timeout to acquire lock. *@post If isOk() caller has a producer lock. * If isTimedOut() there was a timeout. - * If neither then we were stopped. + * If neither then we were shutdown. */ ConsumerLock(ProducerConsumer& p, const Time& timeout); @@ -153,7 +153,7 @@ class ProducerConsumer mutable Monitor monitor; size_t items; size_t waiters; - bool stopped; + bool shutdownFlag; friend class Lock; friend class ProducerLock; diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h index ff949a3e16..80ea92da0e 100644 --- a/cpp/lib/common/sys/ThreadSafeQueue.h +++ b/cpp/lib/common/sys/ThreadSafeQueue.h @@ -46,7 +46,7 @@ class ThreadSafeQueue } /** Pop a value from the front of the queue. Waits till value is available. - *@throw ShutdownException if queue is stopped while waiting. + *@throw ShutdownException if queue is shutdown while waiting. */ T pop() { ProducerConsumer::ConsumerLock consumer(pc); @@ -75,10 +75,10 @@ class ThreadSafeQueue } /** Interrupt threads waiting in pop() */ - void stop() { pc.stop(); } + void shutdown() { pc.shutdown(); } - /** True if queue is stopped */ - bool isStopped() { return pc.isStopped(); } + /** True if queue is shutdown */ + bool isShutdown() { return pc.isShutdown(); } /** Size of the queue */ size_t size() { ProducerConsumer::Lock l(pc); return container.size(); } diff --git a/cpp/tests/ClientChannelTest.cpp b/cpp/tests/ClientChannelTest.cpp index 7f7cfeda89..d041106a23 100644 --- a/cpp/tests/ClientChannelTest.cpp +++ b/cpp/tests/ClientChannelTest.cpp @@ -166,20 +166,13 @@ class ClientChannelTest : public CppUnit::TestCase void testGetFragmentedMessage() { string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size. channel.publish(Message(longStr), exchange, qname); - // FIXME aconway 2007-03-21: Remove couts. - cout << "==== Fragmented publish:" << endl - << connection.conversation << endl; Message getMsg; - cout << "==== Fragmented get:" << endl - << connection.conversation << endl; CPPUNIT_ASSERT(channel.get(getMsg, queue)); } void testConsumeFragmentedMessage() { string xx(FRAME_MAX*2, 'x'); channel.publish(Message(xx), exchange, qname); - cout << "==== Fragmented publish:" << endl - << connection.conversation << endl; channel.start(); string tag; channel.consume(queue, tag, &listener); @@ -190,10 +183,6 @@ class ClientChannelTest : public CppUnit::TestCase while (listener.messages.size() != 2) CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); } - // FIXME aconway 2007-03-21: - cout << "==== Fragmented consme 2 messages:" << endl - << connection.conversation << endl; - CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData()); CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData()); } diff --git a/cpp/tests/HeaderTest.cpp b/cpp/tests/HeaderTest.cpp index f77aaaedb4..77e68829c3 100644 --- a/cpp/tests/HeaderTest.cpp +++ b/cpp/tests/HeaderTest.cpp @@ -53,7 +53,7 @@ public: void testAllSpecificProperties(){ string contentType("text/html"); string contentEncoding("UTF8"); - uint8_t deliveryMode(2); + DeliveryMode deliveryMode(PERSISTENT); uint8_t priority(3); string correlationId("abc"); string replyTo("no-address"); @@ -106,7 +106,7 @@ public: void testSomeSpecificProperties(){ string contentType("application/octet-stream"); - uint8_t deliveryMode(5); + DeliveryMode deliveryMode(PERSISTENT); uint8_t priority(6); string expiration("Z"); uint64_t timestamp(0xabe4a34a); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index a3bbfe4afc..9d200cf001 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -69,12 +69,11 @@ class MessageTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize()); MockChannel channel(1); - // FIXME aconway 2007-02-02: deliver should take ProtocolVersion msg->deliver(channel, "ignore", 0, 100); CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2]->getBody())); diff --git a/cpp/tests/ProducerConsumerTest.cpp b/cpp/tests/ProducerConsumerTest.cpp index 1f2aeffbc5..ee94a56c55 100644 --- a/cpp/tests/ProducerConsumerTest.cpp +++ b/cpp/tests/ProducerConsumerTest.cpp @@ -95,7 +95,7 @@ class ProducerConsumerTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(ProducerConsumerTest); CPPUNIT_TEST(testProduceConsume); CPPUNIT_TEST(testTimeout); - CPPUNIT_TEST(testStop); + CPPUNIT_TEST(testShutdown); CPPUNIT_TEST(testCancel); CPPUNIT_TEST_SUITE_END(); @@ -103,7 +103,7 @@ class ProducerConsumerTest : public CppUnit::TestCase client::InProcessBrokerClient client; ProducerConsumer pc; - WatchedCounter stopped; + WatchedCounter shutdown; WatchedCounter timeout; WatchedCounter consumed; WatchedCounter produced; @@ -124,8 +124,8 @@ class ProducerConsumerTest : public CppUnit::TestCase void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { - if (pc.isStopped()) { - ++stopped; + if (pc.isShutdown()) { + ++shutdown; return; } if (consumer.isTimedOut()) { @@ -189,7 +189,7 @@ public: produce(); CPPUNIT_ASSERT(consumed.waitFor(5)); join(threads); - CPPUNIT_ASSERT_EQUAL(0, int(stopped)); + CPPUNIT_ASSERT_EQUAL(0, int(shutdown)); } void testTimeout() { @@ -219,30 +219,30 @@ public: } - void testStop() { + void testShutdown() { ConsumeRunnable runMe(*this); vector<Thread> threads = startThreads(2, runMe); while (pc.consumers() != 2) Thread::yield(); - pc.stop(); - CPPUNIT_ASSERT(stopped.waitFor(2)); + pc.shutdown(); + CPPUNIT_ASSERT(shutdown.waitFor(2)); join(threads); - threads = startThreads(1, runMe); // Should stop immediately. - CPPUNIT_ASSERT(stopped.waitFor(3)); + threads = startThreads(1, runMe); // Should shutdown immediately. + CPPUNIT_ASSERT(shutdown.waitFor(3)); join(threads); - // Produce/consume while stopped should return isStopped and + // Produce/consume while shutdown should return isShutdown and // throw on confirm. try { ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(pc.isStopped()); + CPPUNIT_ASSERT(pc.isShutdown()); CPPUNIT_FAIL("Expected exception"); } catch (...) {} // Expected try { ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(pc.isStopped()); + CPPUNIT_ASSERT(pc.isShutdown()); CPPUNIT_FAIL("Expected exception"); } catch (...) {} // Expected diff --git a/cpp/tests/setup b/cpp/tests/setup index 9dde4758b1..a50aa8a5cc 100644 --- a/cpp/tests/setup +++ b/cpp/tests/setup @@ -16,12 +16,13 @@ cd $tmp || framework_failure=1 gen_supp=--gen-suppressions=all # This option makes valgrind significantly slower. full_leak_check=--leak-check=full +demangle=--demangle=yes vg_options=" --suppressions=$abs_srcdir/.vg-supp --num-callers=25 - --demangle=no --track-fds=yes + $demangle $full_leak_check $gen_supp " diff --git a/cpp/tests/topictest b/cpp/tests/topictest index 76807b82bd..92e40b2c37 100755 --- a/cpp/tests/topictest +++ b/cpp/tests/topictest @@ -34,5 +34,6 @@ publish() { for ((i=$SUBSCRIBERS ; i--; )); do subscribe $i & done - +# FIXME aconway 2007-03-27: Hack around startup race. Fix topic test. +sleep 1 publish 2>&1 || exit 1 |