diff options
author | Alan Conway <aconway@apache.org> | 2007-03-21 19:21:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-03-21 19:21:59 +0000 |
commit | 62921dc211aa91d28b41ea4bb59d6e1e7e08b781 (patch) | |
tree | f02b09b2ef3bc69c2198afeef9157aa3ec2de128 /cpp/lib | |
parent | c1b0ba624ff2de40b23342cf2a96885342884dad (diff) | |
download | qpid-python-62921dc211aa91d28b41ea4bb59d6e1e7e08b781.tar.gz |
Removed unused files and #includes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/client/Basic.cpp | 255 | ||||
-rw-r--r-- | cpp/lib/client/Basic.h | 195 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 25 |
3 files changed, 5 insertions, 470 deletions
diff --git a/cpp/lib/client/Basic.cpp b/cpp/lib/client/Basic.cpp deleted file mode 100644 index 4a1cf249a8..0000000000 --- a/cpp/lib/client/Basic.cpp +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "Basic.h" -#include "AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "framing/FieldTable.h" -#include "Connection.h" - -using namespace std; - -namespace qpid { -namespace client { - -using namespace sys; -using namespace framing; - -Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {} - -void Basic::consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - channel.sendAndReceiveSync<BasicConsumeOkBody>( - synch, - new BasicConsumeBody( - channel.version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if (synch) { - BasicConsumeOkBody::shared_ptr response = - boost::shared_polymorphic_downcast<BasicConsumeOkBody>( - channel.responses.getResponse()); - tag = response->getConsumerTag(); - } - // FIXME aconway 2007-02-20: Race condition! - // We could receive the first message for the consumer - // before we create the consumer below. - // Move consumer creation to handler for BasicConsumeOkBody - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } -} - - -void Basic::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(channel.version, tag, !synch)); -} - -void Basic::cancelAll(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) - { - Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) - { - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - } - } -} - - - -bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) { - // Expect a message starting with a BasicGetOk - incoming.startGet(); - channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - return incoming.waitGet(msg); -} - - -void Basic::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - const string e = exchange.getName(); - string key = routingKey; - channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)); - //break msg up into header frame and content frame(s) and send these - channel.send(msg.getHeader()); - string data = msg.getData(); - uint64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - uint32_t frag_size = channel.connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - channel.send(new AMQContentBody(data)); - }else{ - uint32_t offset = 0; - uint32_t remaining = data_length - offset; - while (remaining > 0) { - uint32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - channel.send(new AMQContentBody(frag)); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void Basic::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); - switch(method->amqpMethodId()) { - case BasicDeliverBody::METHOD_ID: - case BasicReturnBody::METHOD_ID: - case BasicGetOkBody::METHOD_ID: - case BasicGetEmptyBody::METHOD_ID: - incoming.add(method); - return; - } - throw Channel::UnknownMethod(); -} - -void Basic::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(channel.isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < channel.getPrefetch()) - break; - //else drop-through - case AUTO_ACK: - consumer.lastDeliveryTag = 0; - channel.send( - new BasicAckBody( - channel.version, msg.getDeliveryTag(), multiple)); - case NO_ACK: // Nothing to do - case CLIENT_ACK: // User code must ack. - break; - // TODO aconway 2007-02-22: Provide a way for user - // to ack! - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - - -void Basic::run() { - while(channel.isOpen()) { - try { - Message msg = incoming.waitDispatch(); - if(msg.getMethod()->isA<BasicReturnBody>()) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler == 0) { - // TODO aconway 2007-02-20: proper logging. - cout << "Message returned: " << msg.getData() << endl; - } - else - handler->returned(msg); - } - else { - BasicDeliverBody::shared_ptr deliverBody = - boost::shared_polymorphic_downcast<BasicDeliverBody>( - msg.getMethod()); - std::string tag = deliverBody->getConsumerTag(); - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + tag); - consumer = i->second; - } - deliver(consumer, msg); - } - } - catch (const ShutdownException&) { - /* Orderly shutdown */ - } - catch (const Exception& e) { - // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::Basic::run() terminated by: " << e.toString() - << "(" << typeid(e).name() << ")" << endl; - } - } -} - -void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; -} - -void Basic::setQos(){ - channel.sendAndReceive<BasicQosOkBody>( - new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); -} - - -// TODO aconway 2007-02-22: NOTES: -// Move incoming to BasicChannel - check for uses. - -}} // namespace qpid::client diff --git a/cpp/lib/client/Basic.h b/cpp/lib/client/Basic.h deleted file mode 100644 index f6ae633ab8..0000000000 --- a/cpp/lib/client/Basic.h +++ /dev/null @@ -1,195 +0,0 @@ -#ifndef _client_Basic_h -#define _client_Basic_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "IncomingMessage.h" -#include "sys/Runnable.h" - -namespace qpid { - -namespace framing { -class AMQMethodBody; -class FieldTable; -} - -namespace client { - -class Channel; -class Message; -class Queue; -class Exchange; -class MessageListener; -class ReturnedMessageHandler; - -/** - * The available acknowledgements modes. - * - * \ingroup clientapi - */ -enum AckMode { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; - - -/** - * Represents the AMQP Basic class for sending and receiving messages. - */ -class Basic : public sys::Runnable -{ - public: - Basic(Channel& parent); - - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see AckMode - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * AckMode) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - /** - * Deliver messages from the broker to the appropriate MessageListener. - */ - void run(); - - - private: - - struct Consumer{ - MessageListener* listener; - AckMode ackMode; - int count; - uint64_t lastDeliveryTag; - }; - - typedef std::map<std::string, Consumer> ConsumerMap; - - void handle(boost::shared_ptr<framing::AMQMethodBody>); - void setQos(); - void cancelAll(); - void deliver(Consumer& consumer, Message& msg); - - sys::Mutex lock; - Channel& channel; - IncomingMessage incoming; - ConsumerMap consumers; - ReturnedMessageHandler* returnsHandler; - - // FIXME aconway 2007-02-22: Remove friendship. - friend class Channel; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_Basic_h*/ diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index b4bd311511..627266e580 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -23,25 +23,12 @@ */ #include <map> #include <string> -#include <boost/shared_ptr.hpp> - -#include "amqp_types.h" -#include <QpidError.h> -#include <Connector.h> -#include <sys/ShutdownHandler.h> -#include <sys/TimeoutHandler.h> - - -#include "framing/amqp_types.h" -#include <framing/amqp_framing.h> -#include <ClientExchange.h> -#include <IncomingMessage.h> -#include <ClientMessage.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <ResponseHandler.h> -#include <AMQP_HighestVersion.h> +#include "QpidError.h" #include "ClientChannel.h" +#include "Connector.h" +#include "sys/ShutdownHandler.h" +#include "sys/TimeoutHandler.h" + namespace qpid { @@ -53,8 +40,6 @@ namespace qpid { */ namespace client { -class Channel; - /** * \internal provide access to selected private channel functions * for the Connection without making it a friend of the entire channel. |