diff options
author | Alan Conway <aconway@apache.org> | 2007-03-21 19:12:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-03-21 19:12:14 +0000 |
commit | d66d50b103ab12df58132ce17ed5892df29b4b5c (patch) | |
tree | 0cdeb9f8365be75539e000f7e1d4a76387655b94 | |
parent | 6625d0c47f5252af8d64abce773583ec27f28116 (diff) | |
download | qpid-python-d66d50b103ab12df58132ce17ed5892df29b4b5c.tar.gz |
Refactored client side for dual-mode Channel supporting either 0-9 Message or 0-8 Basic.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@520972 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/lib/client/AckMode.h | 102 | ||||
-rw-r--r-- | qpid/cpp/lib/client/BasicMessageChannel.cpp | 261 | ||||
-rw-r--r-- | qpid/cpp/lib/client/BasicMessageChannel.h | 87 | ||||
-rw-r--r-- | qpid/cpp/lib/client/ClientChannel.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/lib/client/ClientChannel.h | 135 | ||||
-rw-r--r-- | qpid/cpp/lib/client/ClientMessage.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/lib/client/IncomingMessage.h | 1 | ||||
-rw-r--r-- | qpid/cpp/lib/client/Makefile.am | 6 | ||||
-rw-r--r-- | qpid/cpp/lib/client/MessageChannel.h | 94 | ||||
-rw-r--r-- | qpid/cpp/lib/common/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/lib/common/framing/FramingContent.h | 1 | ||||
-rw-r--r-- | qpid/cpp/lib/common/shared_ptr.h | 31 | ||||
-rw-r--r-- | qpid/cpp/tests/ClientChannelTest.cpp | 71 | ||||
-rw-r--r-- | qpid/cpp/tests/InProcessBroker.h | 25 | ||||
-rw-r--r-- | qpid/cpp/tests/ProducerConsumerTest.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/tests/client_test.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/tests/echo_service.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/tests/topic_listener.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/tests/topic_publisher.cpp | 9 |
19 files changed, 831 insertions, 74 deletions
diff --git a/qpid/cpp/lib/client/AckMode.h b/qpid/cpp/lib/client/AckMode.h new file mode 100644 index 0000000000..9ad5ef925c --- /dev/null +++ b/qpid/cpp/lib/client/AckMode.h @@ -0,0 +1,102 @@ +#ifndef _client_AckMode_h +#define _client_AckMode_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * The available acknowledgements modes. + * + * \ingroup clientapi + */ +enum AckMode { + /** No acknowledgement will be sent, broker can + discard messages as soon as they are delivered + to a consumer using this mode. **/ + NO_ACK = 0, + /** Each message will be automatically + acknowledged as soon as it is delivered to the + application **/ + AUTO_ACK = 1, + /** Acknowledgements will be sent automatically, + but not for each message. **/ + LAZY_ACK = 2, + /** The application is responsible for explicitly + acknowledging messages. **/ + CLIENT_ACK = 3 +}; + +}} // namespace qpid::client + + + +#endif /*!_client_AckMode_h*/ +#ifndef _client_AckMode_h +#define _client_AckMode_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * The available acknowledgements modes. + * + * \ingroup clientapi + */ +enum AckMode { + /** No acknowledgement will be sent, broker can + discard messages as soon as they are delivered + to a consumer using this mode. **/ + NO_ACK = 0, + /** Each message will be automatically + acknowledged as soon as it is delivered to the + application **/ + AUTO_ACK = 1, + /** Acknowledgements will be sent automatically, + but not for each message. **/ + LAZY_ACK = 2, + /** The application is responsible for explicitly + acknowledging messages. **/ + CLIENT_ACK = 3 +}; + +}} // namespace qpid::client + + + +#endif /*!_client_AckMode_h*/ diff --git a/qpid/cpp/lib/client/BasicMessageChannel.cpp b/qpid/cpp/lib/client/BasicMessageChannel.cpp new file mode 100644 index 0000000000..012a55b9ea --- /dev/null +++ b/qpid/cpp/lib/client/BasicMessageChannel.cpp @@ -0,0 +1,261 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "BasicMessageChannel.h" +#include "AMQMethodBody.h" +#include "ClientChannel.h" +#include "ReturnedMessageHandler.h" +#include "MessageListener.h" +#include "framing/FieldTable.h" +#include "Connection.h" + +using namespace std; + +namespace qpid { +namespace client { + +using namespace sys; +using namespace framing; + +BasicMessageChannel::BasicMessageChannel(Channel& ch) + : channel(ch), returnsHandler(0) {} + +void BasicMessageChannel::consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) +{ + channel.sendAndReceiveSync<BasicConsumeOkBody>( + synch, + new BasicConsumeBody( + channel.version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable())); + if (synch) { + BasicConsumeOkBody::shared_ptr response = + boost::shared_polymorphic_downcast<BasicConsumeOkBody>( + channel.responses.getResponse()); + tag = response->getConsumerTag(); + } + // FIXME aconway 2007-02-20: Race condition! + // We could receive the first message for the consumer + // before we create the consumer below. + // Move consumer creation to handler for BasicConsumeOkBody + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } +} + + +void BasicMessageChannel::cancel(const std::string& tag, bool synch) { + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + channel.sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(channel.version, tag, !synch)); +} + +void BasicMessageChannel::close(){ + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; + if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) + && c.lastDeliveryTag > 0) + { + channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } + } + incoming.shutdown(); +} + + + +bool BasicMessageChannel::get(Message& msg, const Queue& queue, AckMode ackMode) { + // Expect a message starting with a BasicGetOk + incoming.startGet(); + channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); + return incoming.waitGet(msg); +} + +void BasicMessageChannel::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + msg.getHeader()->setContentSize(msg.getData().size()); + const string e = exchange.getName(); + string key = routingKey; + channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)); + //break msg up into header frame and content frame(s) and send these + channel.send(msg.getHeader()); + string data = msg.getData(); + u_int64_t data_length = data.length(); + if(data_length > 0){ + //frame itself uses 8 bytes + u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; + if(data_length < frag_size){ + channel.send(new AMQContentBody(data)); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(data.substr(offset, length)); + channel.send(new AMQContentBody(frag)); + + offset += length; + remaining = data_length - offset; + } + } + } +} + +void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { + assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); + switch(method->amqpMethodId()) { + case BasicDeliverBody::METHOD_ID: + case BasicReturnBody::METHOD_ID: + case BasicGetOkBody::METHOD_ID: + case BasicGetEmptyBody::METHOD_ID: + incoming.add(method); + return; + } + throw Channel::UnknownMethod(); +} + +void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr body){ + incoming.add(body); +} + +void BasicMessageChannel::handle(AMQContentBody::shared_ptr body){ + incoming.add(body); +} + +void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ + //record delivery tag: + consumer.lastDeliveryTag = msg.getDeliveryTag(); + + //allow registered listener to handle the message + consumer.listener->received(msg); + + if(channel.isOpen()){ + bool multiple(false); + switch(consumer.ackMode){ + case LAZY_ACK: + multiple = true; + if(++(consumer.count) < channel.getPrefetch()) + break; + //else drop-through + case AUTO_ACK: + consumer.lastDeliveryTag = 0; + channel.send( + new BasicAckBody( + channel.version, msg.getDeliveryTag(), multiple)); + case NO_ACK: // Nothing to do + case CLIENT_ACK: // User code must ack. + break; + // TODO aconway 2007-02-22: Provide a way for user + // to ack! + } + } + + //as it stands, transactionality is entirely orthogonal to ack + //mode, though the acks will not be processed by the broker under + //a transaction until it commits. +} + + +void BasicMessageChannel::run() { + while(channel.isOpen()) { + try { + Message msg = incoming.waitDispatch(); + if(msg.getMethod()->isA<BasicReturnBody>()) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; + } + if(handler == 0) { + // TODO aconway 2007-02-20: proper logging. + cout << "Message returned: " << msg.getData() << endl; + } + else + handler->returned(msg); + } + else { + BasicDeliverBody::shared_ptr deliverBody = + boost::shared_polymorphic_downcast<BasicDeliverBody>( + msg.getMethod()); + std::string tag = deliverBody->getConsumerTag(); + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + tag); + consumer = i->second; + } + deliver(consumer, msg); + } + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::Basic::run() terminated by: " << e.toString() + << "(" << typeid(e).name() << ")" << endl; + } + } +} + +void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); + returnsHandler = handler; +} + +void BasicMessageChannel::setQos(){ + channel.sendAndReceive<BasicQosOkBody>( + new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); + if(channel.isTransactional()) + channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); +} + +}} // namespace qpid::client diff --git a/qpid/cpp/lib/client/BasicMessageChannel.h b/qpid/cpp/lib/client/BasicMessageChannel.h new file mode 100644 index 0000000000..b921ec24d9 --- /dev/null +++ b/qpid/cpp/lib/client/BasicMessageChannel.h @@ -0,0 +1,87 @@ +#ifndef _client_BasicMessageChannel_h +#define _client_BasicMessageChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "MessageChannel.h" +#include "IncomingMessage.h" + +namespace qpid { +namespace client { +/** + * Messaging implementation using AMQP 0-8 BasicMessageChannel class + * to send and receiving messages. + */ +class BasicMessageChannel : public MessageChannel +{ + public: + BasicMessageChannel(Channel& parent); + + void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + void cancel(const std::string& tag, bool synch = true); + + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + void run(); + + void handle(boost::shared_ptr<framing::AMQMethodBody>); + + void handle(shared_ptr<framing::AMQHeaderBody>); + + void handle(shared_ptr<framing::AMQContentBody>); + + void setQos(); + + void close(); + + private: + + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + + typedef std::map<std::string, Consumer> ConsumerMap; + + void deliver(Consumer& consumer, Message& msg); + + sys::Mutex lock; + Channel& channel; + IncomingMessage incoming; + ConsumerMap consumers; + ReturnedMessageHandler* returnsHandler; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_BasicMessageChannel_h*/ diff --git a/qpid/cpp/lib/client/ClientChannel.cpp b/qpid/cpp/lib/client/ClientChannel.cpp index 84aa73e6bc..97e0a394d2 100644 --- a/qpid/cpp/lib/client/ClientChannel.cpp +++ b/qpid/cpp/lib/client/ClientChannel.cpp @@ -25,6 +25,9 @@ #include <QpidError.h> #include <MethodBodyInstances.h> #include "Connection.h" +#include "BasicMessageChannel.h" +// FIXME aconway 2007-03-21: +//#include "MessageMessageChannel.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -36,8 +39,10 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(bool _transactional, uint16_t _prefetch) : - basic(*this), +Channel::Channel(bool _transactional, u_int16_t _prefetch, + MessageChannel* impl) : + // FIXME aconway 2007-03-21: MessageMessageChannel + messaging(impl ? impl : new BasicMessageChannel(*this)), connection(0), prefetch(_prefetch), transactional(_transactional) @@ -115,7 +120,7 @@ void Channel::protocolInit( bool Channel::isOpen() const { return connection; } void Channel::setQos() { - basic.setQos(); + messaging->setQos(); // FIXME aconway 2007-02-22: message } @@ -192,7 +197,7 @@ void Channel::handleMethodInContext( } try { switch (method->amqpClassId()) { - case BasicDeliverBody::CLASS_ID: basic.handle(method); break; + case BasicDeliverBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; default: throw UnknownMethod(); @@ -226,11 +231,11 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) { } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - basic.incoming.add(body); + messaging->handle(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - basic.incoming.add(body); + messaging->handle(body); } void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -238,7 +243,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - basicDispatcher = Thread(basic); + dispatcher = Thread(*messaging); } // Close called by local application. @@ -274,13 +279,12 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { void Channel::closeInternal() { if (isOpen()); { - basic.cancelAll(); - basic.incoming.shutdown(); + messaging->close(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); } - basicDispatcher.join(); + dispatcher.join(); } void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) @@ -299,4 +303,31 @@ void Channel::sendAndReceiveSync( send(body); } +void Channel::consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields); +} + +void Channel::cancel(const std::string& tag, bool synch) { + messaging->cancel(tag, synch); +} + +bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { + return messaging->get(msg, queue, ackMode); +} + +void Channel::publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory, bool immediate) { + messaging->publish(msg, exchange, routingKey, mandatory, immediate); +} + +void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) { + messaging->setReturnedMessageHandler(handler); +} + +void Channel::run() { + messaging->run(); +} diff --git a/qpid/cpp/lib/client/ClientChannel.h b/qpid/cpp/lib/client/ClientChannel.h index 3ecab05d0b..58a007977d 100644 --- a/qpid/cpp/lib/client/ClientChannel.h +++ b/qpid/cpp/lib/client/ClientChannel.h @@ -21,7 +21,7 @@ * under the License. * */ -#include "sys/types.h" +#include <boost/scoped_ptr.hpp> #include <framing/amqp_framing.h> #include <ClientExchange.h> #include <ClientMessage.h> @@ -29,7 +29,7 @@ #include <ResponseHandler.h> #include "ChannelAdapter.h" #include "Thread.h" -#include "Basic.h" +#include "AckMode.h" namespace qpid { @@ -41,7 +41,9 @@ class AMQMethodBody; namespace client { class Connection; - +class MessageChannel; +class MessageListener; +class ReturnedMessageHandler; /** * Represents an AMQP channel, i.e. loosely a session of work. It @@ -53,16 +55,12 @@ class Connection; class Channel : public framing::ChannelAdapter { private: - // TODO aconway 2007-02-22: Remove friendship. - friend class Basic; - // FIXME aconway 2007-02-22: friend class Message; - struct UnknownMethod {}; sys::Mutex lock; - Basic basic; + boost::scoped_ptr<MessageChannel> messaging; Connection* connection; - sys::Thread basicDispatcher; + sys::Thread dispatcher; ResponseHandler responses; uint16_t prefetch; @@ -107,7 +105,10 @@ class Channel : public framing::ChannelAdapter void closeInternal(); void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); + // FIXME aconway 2007-02-23: Get rid of friendships. friend class Connection; + friend class BasicMessageChannel; // for sendAndReceive. + friend class MessageMessageChannel; // for sendAndReceive. public: @@ -121,8 +122,15 @@ class Channel : public framing::ChannelAdapter * @param prefetch specifies the number of unacknowledged * messages the channel is willing to have sent to it * asynchronously + * + * @param messageImpl Alternate messaging implementation class to + * allow alternate protocol implementations of messaging + * operations. Takes ownership. */ - Channel(bool transactional = false, uint16_t prefetch = 500); + Channel( + bool transactional = false, u_int16_t prefetch = 500, + MessageChannel* messageImpl = 0); + ~Channel(); /** @@ -190,13 +198,6 @@ class Channel : public framing::ChannelAdapter bool synch = true); /** - * Get a Basic object which provides functions to send and - * receive messages using the AMQP 0-8 Basic class methods. - *@see Basic - */ - Basic& getBasic() { return basic; } - - /** * For a transactional channel this will commit all * publications and acknowledgements since the last commit (or * the channel was opened if there has been no previous @@ -243,6 +244,106 @@ class Channel : public framing::ChannelAdapter /** True if the channel is open */ bool isOpen() const; + + /** Get the connection associated with this channel */ + Connection& getConnection() { return *connection; } + + /** Return the protocol version */ + framing::ProtocolVersion getVersion() const { return version ; } + + /** + * Creates a 'consumer' for a queue. Messages in (or arriving + * at) that queue will be delivered to consumers + * asynchronously. + * + * @param queue a Queue instance representing the queue to + * consume from + * + * @param tag an identifier to associate with the consumer + * that can be used to cancel its subscription (if empty, this + * will be assigned by the broker) + * + * @param listener a pointer to an instance of an + * implementation of the MessageListener interface. Messages + * received from this queue for this consumer will result in + * invocation of the received() method on the listener, with + * the message itself passed in. + * + * @param ackMode the mode of acknowledgement that the broker + * should assume for this consumer. @see AckMode + * + * @param noLocal if true, this consumer will not be sent any + * message published by this connection + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + /** + * Cancels a subscription previously set up through a call to consume(). + * + * @param tag the identifier used (or assigned) in the consume + * request that set up the subscription to be cancelled. + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void cancel(const std::string& tag, bool synch = true); + /** + * Synchronous pull of a message from a queue. + * + * @param msg a message object that will contain the message + * headers and content if the call completes. + * + * @param queue the queue to consume from + * + * @param ackMode the acknowledgement mode to use (@see + * AckMode) + * + * @return true if a message was succcessfully dequeued from + * the queue, false if the queue was empty. + */ + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + /** + * Publishes (i.e. sends a message to the broker). + * + * @param msg the message to publish + * + * @param exchange the exchange to publish the message to + * + * @param routingKey the routing key to publish with + * + * @param mandatory if true and the exchange to which this + * publish is directed has no matching bindings, the message + * will be returned (see setReturnedMessageHandler()). + * + * @param immediate if true and there is no consumer to + * receive this message on publication, the message will be + * returned (see setReturnedMessageHandler()). + */ + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + /** + * Set a handler for this channel that will process any + * returned messages + * + * @see publish() + */ + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + /** + * Deliver messages from the broker to the appropriate MessageListener. + */ + void run(); + + }; }} diff --git a/qpid/cpp/lib/client/ClientMessage.cpp b/qpid/cpp/lib/client/ClientMessage.cpp index f55c4abfe6..3ad2d0b2f4 100644 --- a/qpid/cpp/lib/client/ClientMessage.cpp +++ b/qpid/cpp/lib/client/ClientMessage.cpp @@ -30,7 +30,6 @@ Message::Message(const std::string& d) void Message::setData(const std::string& d) { data = d; - header->setContentSize(d.size()); } Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ diff --git a/qpid/cpp/lib/client/IncomingMessage.h b/qpid/cpp/lib/client/IncomingMessage.h index 2d7c8723c5..6ec949028d 100644 --- a/qpid/cpp/lib/client/IncomingMessage.h +++ b/qpid/cpp/lib/client/IncomingMessage.h @@ -43,6 +43,7 @@ namespace client { * * Broker initiated messages (basic.return, basic.deliver) are * queued for handling by the user dispatch thread. + * */ class IncomingMessage { public: diff --git a/qpid/cpp/lib/client/Makefile.am b/qpid/cpp/lib/client/Makefile.am index 1e10a2a244..46d8775072 100644 --- a/qpid/cpp/lib/client/Makefile.am +++ b/qpid/cpp/lib/client/Makefile.am @@ -14,7 +14,7 @@ libqpidclient_la_SOURCES = \ ClientExchange.cpp \ ClientMessage.cpp \ ClientQueue.cpp \ - Basic.cpp \ + BasicMessageChannel.cpp \ Connection.cpp \ Connector.cpp \ IncomingMessage.cpp \ @@ -22,14 +22,16 @@ libqpidclient_la_SOURCES = \ ResponseHandler.cpp \ ReturnedMessageHandler.cpp pkginclude_HEADERS = \ + AckMode.h \ ClientChannel.h \ ClientExchange.h \ ClientMessage.h \ ClientQueue.h \ - Basic.h \ Connection.h \ Connector.h \ IncomingMessage.h \ + MessageChannel.h \ + BasicMessageChannel.h \ MessageListener.h \ MethodBodyInstances.h \ ResponseHandler.h \ diff --git a/qpid/cpp/lib/client/MessageChannel.h b/qpid/cpp/lib/client/MessageChannel.h new file mode 100644 index 0000000000..028a19a949 --- /dev/null +++ b/qpid/cpp/lib/client/MessageChannel.h @@ -0,0 +1,94 @@ +#ifndef _client_MessageChannel_h +#define _client_MessageChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "shared_ptr.h" +#include "sys/Runnable.h" +#include "AckMode.h" + +namespace qpid { + +namespace framing { +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class FieldTable; +} + +namespace client { + +class Channel; +class Message; +class Queue; +class Exchange; +class MessageListener; +class ReturnedMessageHandler; + +/** + * Abstract interface for messaging implementation for a channel. + * + *@see Channel for documentation. + */ +class MessageChannel : public sys::Runnable +{ + public: + /**@see Channel::consume */ + virtual void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0) = 0; + + /**@see Channel::cancel */ + virtual void cancel(const std::string& tag, bool synch = true) = 0; + + /**@see Channel::get */ + virtual bool get( + Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0; + + /**@see Channel::get */ + virtual void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false) = 0; + + /**@see Channel::setReturnedMessageHandler */ + virtual void setReturnedMessageHandler( + ReturnedMessageHandler* handler) = 0; + + /** Handle an incoming method. */ + virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0; + + /** Handle an incoming header */ + virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0; + + /** Handle an incoming content */ + virtual void handle(shared_ptr<framing::AMQContentBody>) = 0; + + /** Send channel's QOS settings */ + virtual void setQos() = 0; + + /** Channel is closing */ + virtual void close() = 0; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_MessageChannel_h*/ diff --git a/qpid/cpp/lib/common/Makefile.am b/qpid/cpp/lib/common/Makefile.am index d70adf1186..355f764b6e 100644 --- a/qpid/cpp/lib/common/Makefile.am +++ b/qpid/cpp/lib/common/Makefile.am @@ -117,6 +117,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/amqp_framing.h \ $(framing)/amqp_types.h \ $(framing)/Proxy.h \ + shared_ptr.h \ Exception.h \ ExceptionHolder.h \ QpidError.h \ diff --git a/qpid/cpp/lib/common/framing/FramingContent.h b/qpid/cpp/lib/common/framing/FramingContent.h index 696bcc7c1a..876e90c905 100644 --- a/qpid/cpp/lib/common/framing/FramingContent.h +++ b/qpid/cpp/lib/common/framing/FramingContent.h @@ -30,6 +30,7 @@ class Content bool isInline() const { return discriminator == INLINE; } bool isReference() const { return discriminator == REFERENCE; } const string& getValue() const { return value; } + void setValue(const string& newValue) { value = newValue; } friend std::ostream& operator<<(std::ostream&, const Content&); }; diff --git a/qpid/cpp/lib/common/shared_ptr.h b/qpid/cpp/lib/common/shared_ptr.h new file mode 100644 index 0000000000..6725f7acb3 --- /dev/null +++ b/qpid/cpp/lib/common/shared_ptr.h @@ -0,0 +1,31 @@ +#ifndef _common_shared_ptr_h +#define _common_shared_ptr_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +/// Import shared_ptr into qpid namespace. +using boost::shared_ptr; +} // namespace qpid + + + +#endif /*!_common_shared_ptr_h*/ diff --git a/qpid/cpp/tests/ClientChannelTest.cpp b/qpid/cpp/tests/ClientChannelTest.cpp index 7b0bc363fe..f22170691c 100644 --- a/qpid/cpp/tests/ClientChannelTest.cpp +++ b/qpid/cpp/tests/ClientChannelTest.cpp @@ -32,6 +32,10 @@ using namespace qpid::client; using namespace qpid::sys; using namespace qpid::framing; +/// Small frame size so we can create fragmented messages. +const size_t FRAME_MAX = 256; + + /** * Test client API using an in-process broker. */ @@ -42,6 +46,8 @@ class ClientChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testGetNoContent); CPPUNIT_TEST(testConsumeCancel); CPPUNIT_TEST(testConsumePublished); + CPPUNIT_TEST(testGetFragmentedMessage); + CPPUNIT_TEST(testConsumeFragmentedMessage); CPPUNIT_TEST_SUITE_END(); struct Listener: public qpid::client::MessageListener { @@ -65,7 +71,8 @@ class ClientChannelTest : public CppUnit::TestCase public: ClientChannelTest() - : qname("testq"), data("hello"), + : connection(FRAME_MAX), + qname("testq"), data("hello"), queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) { connection.openChannel(channel); @@ -76,21 +83,21 @@ class ClientChannelTest : public CppUnit::TestCase void testPublishGet() { Message pubMsg(data); pubMsg.getHeaders().setString("hello", "world"); - channel.getBasic().publish(pubMsg, exchange, qname); + channel.publish(pubMsg, exchange, qname); Message getMsg; - CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue)); + CPPUNIT_ASSERT(channel.get(getMsg, queue)); CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); CPPUNIT_ASSERT_EQUAL(string("world"), getMsg.getHeaders().getString("hello")); - CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue + CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue } void testGetNoContent() { Message pubMsg; pubMsg.getHeaders().setString("hello", "world"); - channel.getBasic().publish(pubMsg, exchange, qname); + channel.publish(pubMsg, exchange, qname); Message getMsg; - CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue)); + CPPUNIT_ASSERT(channel.get(getMsg, queue)); CPPUNIT_ASSERT(getMsg.getData().empty()); CPPUNIT_ASSERT_EQUAL(string("world"), getMsg.getHeaders().getString("hello")); @@ -98,10 +105,10 @@ class ClientChannelTest : public CppUnit::TestCase void testConsumeCancel() { string tag; // Broker assigned - channel.getBasic().consume(queue, tag, &listener); + channel.consume(queue, tag, &listener); channel.start(); CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); - channel.getBasic().publish(Message("a"), exchange, qname); + channel.publish(Message("a"), exchange, qname); { Mutex::ScopedLock l(listener.monitor); Time deadline(now() + 1*TIME_SEC); @@ -112,8 +119,8 @@ class ClientChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); - channel.getBasic().publish(Message("b"), exchange, qname); - channel.getBasic().publish(Message("c"), exchange, qname); + channel.publish(Message("b"), exchange, qname); + channel.publish(Message("c"), exchange, qname); { Mutex::ScopedLock l(listener.monitor); while (listener.messages.size() != 3) { @@ -124,15 +131,15 @@ class ClientChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); - channel.getBasic().cancel(tag); - channel.getBasic().publish(Message("d"), exchange, qname); + channel.cancel(tag); + channel.publish(Message("d"), exchange, qname); CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); { Mutex::ScopedLock l(listener.monitor); CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); } Message msg; - CPPUNIT_ASSERT(channel.getBasic().get(msg, queue)); + CPPUNIT_ASSERT(channel.get(msg, queue)); CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); } @@ -140,9 +147,9 @@ class ClientChannelTest : public CppUnit::TestCase void testConsumePublished() { Message pubMsg("x"); pubMsg.getHeaders().setString("y", "z"); - channel.getBasic().publish(pubMsg, exchange, qname); + channel.publish(pubMsg, exchange, qname); string tag; - channel.getBasic().consume(queue, tag, &listener); + channel.consume(queue, tag, &listener); CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); channel.start(); { @@ -155,8 +162,40 @@ class ClientChannelTest : public CppUnit::TestCase listener.messages[0].getHeaders().getString("y")); } + void testGetFragmentedMessage() { + string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size. + channel.publish(Message(longStr), exchange, qname); + // FIXME aconway 2007-03-21: Remove couts. + cout << "==== Fragmented publish:" << endl + << connection.conversation << endl; + Message getMsg; + cout << "==== Fragmented get:" << endl + << connection.conversation << endl; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + } - + void testConsumeFragmentedMessage() { + string xx(FRAME_MAX*2, 'x'); + channel.publish(Message(xx), exchange, qname); + cout << "==== Fragmented publish:" << endl + << connection.conversation << endl; + channel.start(); + string tag; + channel.consume(queue, tag, &listener); + string yy(FRAME_MAX*2, 'y'); + channel.publish(Message(yy), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 2) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + // FIXME aconway 2007-03-21: + cout << "==== Fragmented consme 2 messages:" << endl + << connection.conversation << endl; + + CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData()); + } }; // Make this test suite a plugin. diff --git a/qpid/cpp/tests/InProcessBroker.h b/qpid/cpp/tests/InProcessBroker.h index 709ca9b953..833b821d11 100644 --- a/qpid/cpp/tests/InProcessBroker.h +++ b/qpid/cpp/tests/InProcessBroker.h @@ -145,25 +145,30 @@ std::ostream& operator<<( return out; } +} // namespace broker -}} // namespace qpid::broker - +namespace client { /** An in-process client+broker all in one. */ -class InProcessBrokerClient : public qpid::client::Connection { +class InProcessBrokerClient : public client::Connection { public: - qpid::broker::InProcessBroker broker; - qpid::broker::InProcessBroker::Conversation& conversation; + broker::InProcessBroker broker; + broker::InProcessBroker::Conversation& conversation; /** Constructor creates broker and opens client connection. */ - InProcessBrokerClient(qpid::framing::ProtocolVersion version= - qpid::framing::highestProtocolVersion - ) : broker(version), conversation(broker.conversation) + InProcessBrokerClient( + u_int32_t max_frame_size=65536, + framing::ProtocolVersion version= framing::highestProtocolVersion + ) : client::Connection(false, max_frame_size, version), + broker(version), + conversation(broker.conversation) { setConnector(broker); open(""); } - - ~InProcessBrokerClient() {} }; + +}} // namespace qpid::client + + #endif // _tests_InProcessBroker_h diff --git a/qpid/cpp/tests/ProducerConsumerTest.cpp b/qpid/cpp/tests/ProducerConsumerTest.cpp index e6d4090596..1f2aeffbc5 100644 --- a/qpid/cpp/tests/ProducerConsumerTest.cpp +++ b/qpid/cpp/tests/ProducerConsumerTest.cpp @@ -30,8 +30,9 @@ #include "AMQP_HighestVersion.h" #include "sys/AtomicCount.h" -using namespace qpid::sys; -using namespace qpid::framing; +using namespace qpid; +using namespace sys; +using namespace framing; using namespace boost; using namespace std; @@ -99,7 +100,7 @@ class ProducerConsumerTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE_END(); public: - InProcessBrokerClient client; + client::InProcessBrokerClient client; ProducerConsumer pc; WatchedCounter stopped; @@ -166,7 +167,7 @@ class ProducerConsumerTest : public CppUnit::TestCase } public: - ProducerConsumerTest() : client(highestProtocolVersion) {} + ProducerConsumerTest() : client() {} void testProduceConsume() { ConsumeRunnable runMe(*this); diff --git a/qpid/cpp/tests/client_test.cpp b/qpid/cpp/tests/client_test.cpp index 413523a6a7..92952c69b1 100644 --- a/qpid/cpp/tests/client_test.cpp +++ b/qpid/cpp/tests/client_test.cpp @@ -102,7 +102,7 @@ int main(int argc, char**) Monitor monitor; SimpleListener listener(&monitor); string tag("MyTag"); - channel.getBasic().consume(queue, tag, &listener); + channel.consume(queue, tag, &listener); if (verbose) std::cout << "Registered consumer." << std::endl; //we need to enable the message dispatching for this channel @@ -115,7 +115,7 @@ int main(int argc, char**) Message msg; string data("MyMessage"); msg.setData(data); - channel.getBasic().publish(msg, exchange, "MyTopic"); + channel.publish(msg, exchange, "MyTopic"); if (verbose) std::cout << "Published message: " << data << std::endl; { diff --git a/qpid/cpp/tests/echo_service.cpp b/qpid/cpp/tests/echo_service.cpp index 412ffbeb58..ff11a336fe 100644 --- a/qpid/cpp/tests/echo_service.cpp +++ b/qpid/cpp/tests/echo_service.cpp @@ -116,7 +116,7 @@ int main(int argc, char** argv){ //Consume from the response queue, logging all echoed message to console: LoggingListener listener; std::string tag; - channel.getBasic().consume(response, tag, &listener); + channel.consume(response, tag, &listener); //Process incoming requests on a new thread channel.start(); @@ -129,7 +129,7 @@ int main(int argc, char** argv){ Message msg; msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); msg.setData(text); - channel.getBasic().publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); + channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); std::cout << "Enter text to send:" << std::endl; } @@ -158,10 +158,10 @@ int main(int argc, char** argv){ //Consume from the request queue, echoing back all messages received to the client that sent them EchoServer server(&channel); std::string tag = "server_tag"; - channel.getBasic().consume(request, tag, &server); + channel.consume(request, tag, &server); //Process incoming requests on the main thread - channel.getBasic().run(); + channel.run(); connection.close(); } catch(qpid::QpidError error) { @@ -184,7 +184,7 @@ void EchoServer::received(Message& message) std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; //'echo' the message back: - channel->getBasic().publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); + channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); } } diff --git a/qpid/cpp/tests/topic_listener.cpp b/qpid/cpp/tests/topic_listener.cpp index 5f5500f7b9..5928dac49a 100644 --- a/qpid/cpp/tests/topic_listener.cpp +++ b/qpid/cpp/tests/topic_listener.cpp @@ -119,9 +119,9 @@ int main(int argc, char** argv){ //set up listener Listener listener(&channel, response.getName(), args.getTransactional()); string tag; - channel.getBasic().consume(control, tag, &listener, args.getAckMode()); + channel.consume(control, tag, &listener, args.getAckMode()); cout << "topic_listener: Consuming." << endl; - channel.getBasic().run(); + channel.run(); connection.close(); cout << "topic_listener: normal exit" << endl; return 0; @@ -166,7 +166,7 @@ void Listener::report(){ << time/TIME_MSEC << " ms."; Message msg(reportstr.str()); msg.getHeaders().setString("TYPE", "REPORT"); - channel->getBasic().publish(msg, string(), responseQueue); + channel->publish(msg, string(), responseQueue); if(transactional){ channel->commit(); } diff --git a/qpid/cpp/tests/topic_publisher.cpp b/qpid/cpp/tests/topic_publisher.cpp index 0e6c63ab35..2fd1e6b810 100644 --- a/qpid/cpp/tests/topic_publisher.cpp +++ b/qpid/cpp/tests/topic_publisher.cpp @@ -129,7 +129,7 @@ int main(int argc, char** argv) { //set up listener Publisher publisher(&channel, "topic_control", args.getTransactional()); std::string tag("mytag"); - channel.getBasic().consume(response, tag, &publisher, args.getAckMode()); + channel.consume(response, tag, &publisher, args.getAckMode()); channel.start(); int batchSize(args.getBatches()); @@ -187,12 +187,13 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ { Monitor::ScopedLock l(monitor); for(int i = 0; i < msgs; i++){ - channel->getBasic().publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + channel->publish( + msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); } //send report request Message reportRequest; reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - channel->getBasic().publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); if(transactional){ channel->commit(); } @@ -216,7 +217,7 @@ void Publisher::terminate(){ //send termination request Message terminationRequest; terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - channel->getBasic().publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); if(transactional){ channel->commit(); } |