summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-21 19:21:59 +0000
committerAlan Conway <aconway@apache.org>2007-03-21 19:21:59 +0000
commit62921dc211aa91d28b41ea4bb59d6e1e7e08b781 (patch)
treef02b09b2ef3bc69c2198afeef9157aa3ec2de128 /cpp/lib
parentc1b0ba624ff2de40b23342cf2a96885342884dad (diff)
downloadqpid-python-62921dc211aa91d28b41ea4bb59d6e1e7e08b781.tar.gz
Removed unused files and #includes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/client/Basic.cpp255
-rw-r--r--cpp/lib/client/Basic.h195
-rw-r--r--cpp/lib/client/Connection.h25
3 files changed, 5 insertions, 470 deletions
diff --git a/cpp/lib/client/Basic.cpp b/cpp/lib/client/Basic.cpp
deleted file mode 100644
index 4a1cf249a8..0000000000
--- a/cpp/lib/client/Basic.cpp
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "Basic.h"
-#include "AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "framing/FieldTable.h"
-#include "Connection.h"
-
-using namespace std;
-
-namespace qpid {
-namespace client {
-
-using namespace sys;
-using namespace framing;
-
-Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {}
-
-void Basic::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- channel.sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- new BasicConsumeBody(
- channel.version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if (synch) {
- BasicConsumeOkBody::shared_ptr response =
- boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
- channel.responses.getResponse());
- tag = response->getConsumerTag();
- }
- // FIXME aconway 2007-02-20: Race condition!
- // We could receive the first message for the consumer
- // before we create the consumer below.
- // Move consumer creation to handler for BasicConsumeOkBody
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- THROW_QPID_ERROR(CLIENT_ERROR,
- "Consumer already exists with tag="+tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
- }
-}
-
-
-void Basic::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
- channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(channel.version, tag, !synch));
-}
-
-void Basic::cancelAll(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
- {
- Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
- {
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
- }
- }
-}
-
-
-
-bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) {
- // Expect a message starting with a BasicGetOk
- incoming.startGet();
- channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- return incoming.waitGet(msg);
-}
-
-
-void Basic::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- const string e = exchange.getName();
- string key = routingKey;
- channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
- //break msg up into header frame and content frame(s) and send these
- channel.send(msg.getHeader());
- string data = msg.getData();
- uint64_t data_length = data.length();
- if(data_length > 0){
- //frame itself uses 8 bytes
- uint32_t frag_size = channel.connection->getMaxFrameSize() - 8;
- if(data_length < frag_size){
- channel.send(new AMQContentBody(data));
- }else{
- uint32_t offset = 0;
- uint32_t remaining = data_length - offset;
- while (remaining > 0) {
- uint32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- channel.send(new AMQContentBody(frag));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
-void Basic::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case BasicDeliverBody::METHOD_ID:
- case BasicReturnBody::METHOD_ID:
- case BasicGetOkBody::METHOD_ID:
- case BasicGetEmptyBody::METHOD_ID:
- incoming.add(method);
- return;
- }
- throw Channel::UnknownMethod();
-}
-
-void Basic::deliver(Consumer& consumer, Message& msg){
- //record delivery tag:
- consumer.lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer.listener->received(msg);
-
- if(channel.isOpen()){
- bool multiple(false);
- switch(consumer.ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer.count) < channel.getPrefetch())
- break;
- //else drop-through
- case AUTO_ACK:
- consumer.lastDeliveryTag = 0;
- channel.send(
- new BasicAckBody(
- channel.version, msg.getDeliveryTag(), multiple));
- case NO_ACK: // Nothing to do
- case CLIENT_ACK: // User code must ack.
- break;
- // TODO aconway 2007-02-22: Provide a way for user
- // to ack!
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-
-void Basic::run() {
- while(channel.isOpen()) {
- try {
- Message msg = incoming.waitDispatch();
- if(msg.getMethod()->isA<BasicReturnBody>()) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler == 0) {
- // TODO aconway 2007-02-20: proper logging.
- cout << "Message returned: " << msg.getData() << endl;
- }
- else
- handler->returned(msg);
- }
- else {
- BasicDeliverBody::shared_ptr deliverBody =
- boost::shared_polymorphic_downcast<BasicDeliverBody>(
- msg.getMethod());
- std::string tag = deliverBody->getConsumerTag();
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" + tag);
- consumer = i->second;
- }
- deliver(consumer, msg);
- }
- }
- catch (const ShutdownException&) {
- /* Orderly shutdown */
- }
- catch (const Exception& e) {
- // FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::Basic::run() terminated by: " << e.toString()
- << "(" << typeid(e).name() << ")" << endl;
- }
- }
-}
-
-void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- Mutex::ScopedLock l(lock);
- returnsHandler = handler;
-}
-
-void Basic::setQos(){
- channel.sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
- if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
-}
-
-
-// TODO aconway 2007-02-22: NOTES:
-// Move incoming to BasicChannel - check for uses.
-
-}} // namespace qpid::client
diff --git a/cpp/lib/client/Basic.h b/cpp/lib/client/Basic.h
deleted file mode 100644
index f6ae633ab8..0000000000
--- a/cpp/lib/client/Basic.h
+++ /dev/null
@@ -1,195 +0,0 @@
-#ifndef _client_Basic_h
-#define _client_Basic_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "IncomingMessage.h"
-#include "sys/Runnable.h"
-
-namespace qpid {
-
-namespace framing {
-class AMQMethodBody;
-class FieldTable;
-}
-
-namespace client {
-
-class Channel;
-class Message;
-class Queue;
-class Exchange;
-class MessageListener;
-class ReturnedMessageHandler;
-
-/**
- * The available acknowledgements modes.
- *
- * \ingroup clientapi
- */
-enum AckMode {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
-
-
-/**
- * Represents the AMQP Basic class for sending and receiving messages.
- */
-class Basic : public sys::Runnable
-{
- public:
- Basic(Channel& parent);
-
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see AckMode
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(const std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * AckMode)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- /**
- * Deliver messages from the broker to the appropriate MessageListener.
- */
- void run();
-
-
- private:
-
- struct Consumer{
- MessageListener* listener;
- AckMode ackMode;
- int count;
- uint64_t lastDeliveryTag;
- };
-
- typedef std::map<std::string, Consumer> ConsumerMap;
-
- void handle(boost::shared_ptr<framing::AMQMethodBody>);
- void setQos();
- void cancelAll();
- void deliver(Consumer& consumer, Message& msg);
-
- sys::Mutex lock;
- Channel& channel;
- IncomingMessage incoming;
- ConsumerMap consumers;
- ReturnedMessageHandler* returnsHandler;
-
- // FIXME aconway 2007-02-22: Remove friendship.
- friend class Channel;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_Basic_h*/
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index b4bd311511..627266e580 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -23,25 +23,12 @@
*/
#include <map>
#include <string>
-#include <boost/shared_ptr.hpp>
-
-#include "amqp_types.h"
-#include <QpidError.h>
-#include <Connector.h>
-#include <sys/ShutdownHandler.h>
-#include <sys/TimeoutHandler.h>
-
-
-#include "framing/amqp_types.h"
-#include <framing/amqp_framing.h>
-#include <ClientExchange.h>
-#include <IncomingMessage.h>
-#include <ClientMessage.h>
-#include <MessageListener.h>
-#include <ClientQueue.h>
-#include <ResponseHandler.h>
-#include <AMQP_HighestVersion.h>
+#include "QpidError.h"
#include "ClientChannel.h"
+#include "Connector.h"
+#include "sys/ShutdownHandler.h"
+#include "sys/TimeoutHandler.h"
+
namespace qpid {
@@ -53,8 +40,6 @@ namespace qpid {
*/
namespace client {
-class Channel;
-
/**
* \internal provide access to selected private channel functions
* for the Connection without making it a friend of the entire channel.