diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-19 16:54:44 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-19 16:54:44 +0000 |
commit | d66aca6eb24b89052e7e2ab05ade9fcd5398bb95 (patch) | |
tree | b576cec8a33157707585a5ca63b730765276ef69 /Final/cpp/lib/client | |
parent | 96420dfa6bb12a4b9fce082d50e49910e3dc8779 (diff) | |
download | qpid-python-d66aca6eb24b89052e7e2ab05ade9fcd5398bb95.tar.gz |
Undoing the accidental move instead of a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@596363 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'Final/cpp/lib/client')
22 files changed, 0 insertions, 2607 deletions
diff --git a/Final/cpp/lib/client/ClientChannel.cpp b/Final/cpp/lib/client/ClientChannel.cpp deleted file mode 100644 index a97d79dcf9..0000000000 --- a/Final/cpp/lib/client/ClientChannel.cpp +++ /dev/null @@ -1,434 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientChannel.h> -#include <sys/Monitor.h> -#include <ClientMessage.h> -#include <QpidError.h> -#include <MethodBodyInstances.h> - -using namespace boost; //to use dynamic_pointer_cast -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - -Channel::Channel(bool _transactional, u_int16_t _prefetch) : - id(0), - con(0), - out(0), - incoming(0), - closed(true), - prefetch(_prefetch), - transactional(_transactional), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - version(8, 0) -{ } - -Channel::~Channel(){ - stop(); -} - -void Channel::setPrefetch(u_int16_t _prefetch){ - prefetch = _prefetch; - if(con != 0 && out != 0){ - setQos(); - } -} - -void Channel::setQos(){ -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); - if(transactional){ - sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok); - } -} - -void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); - FieldTable args; - AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_declare_ok); - }else{ - out->send(frame); - } -} - -void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_delete_ok); - }else{ - out->send(frame); - } -} - -void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); - FieldTable args; - AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false/*passive*/, queue.isDurable(), - queue.isExclusive(), - queue.isAutoDelete(), !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_declare_ok); - if(queue.getName().length() == 0){ - QueueDeclareOkBody::shared_ptr response = - dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); - queue.setName(response->getQueue()); - } - }else{ - out->send(frame); - } -} - -void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_delete_ok); - }else{ - out->send(frame); - } -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_bind_ok); - }else{ - out->send(frame); - } -} - -void Channel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - string q = queue.getName(); - AMQFrame* frame = - new AMQFrame(version, - id, - new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if(synch){ - sendAndReceive(frame, method_bodies.basic_consume_ok); - BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); - tag = response->getConsumerTag(); - }else{ - out->send(frame); - } - Consumer* c = new Consumer(); - c->listener = listener; - c->ackMode = ackMode; - c->lastDeliveryTag = 0; - consumers[tag] = c; -} - -void Channel::cancel(std::string& tag, bool synch){ - Consumer* c = consumers[tag]; - if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); - } - - AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.basic_cancel_ok); - }else{ - out->send(frame); - } - consumers.erase(tag); - if(c != 0){ - delete c; - } -} - -void Channel::cancelAll(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ - Consumer* c = i->second; - if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); - } - consumers.erase(i); - delete c; - } -} - -void Channel::retrieve(Message& msg){ - Monitor::ScopedLock l(retrievalMonitor); - while(retrieved == 0){ - retrievalMonitor.wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - retrieved->getData(msg.data); - delete retrieved; - retrieved = 0; -} - -bool Channel::get(Message& msg, const Queue& queue, int ackMode){ - string name = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode)); - responses.expect(); - out->send(frame); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(method_bodies.basic_get_ok.match(response.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); - } - retrieve(msg); - return true; - }if(method_bodies.basic_get_empty.match(response.get())){ - return false; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); - } -} - - -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - string e = exchange.getName(); - string key = routingKey; - - out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); - //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); - AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(version, id, body)); - - u_int64_t data_length = data.length(); - if(data_length > 0){ - u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - out->send(new AMQFrame(version, id, new AMQContentBody(data))); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - out->send(new AMQFrame(version, id, new AMQContentBody(frag))); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void Channel::commit(){ - AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version)); - sendAndReceive(frame, method_bodies.tx_commit_ok); -} - -void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version)); - sendAndReceive(frame, method_bodies.tx_rollback_ok); -} - -void Channel::handleMethod(AMQMethodBody::shared_ptr body){ - //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request - if(method_bodies.basic_deliver.match(body.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); - } - }else if(method_bodies.basic_return.match(body.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); - } - }else if(method_bodies.channel_close.match(body.get())){ - con->removeChannel(this); - //need to signal application that channel has been closed through exception - - }else if(method_bodies.channel_flow.match(body.get())){ - - } else if(responses.isWaiting()){ - responses.signalResponse(body); - }else{ - //signal error - std::cout << "Unhandled method: " << *body << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); - } -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - dispatcher = Thread(this); -} - -void Channel::stop(){ - { - Monitor::ScopedLock l(dispatchMonitor); - closed = true; - dispatchMonitor.notify(); - } - dispatcher.join(); -} - -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - if(incoming->isResponse()){ - Monitor::ScopedLock l(retrievalMonitor); - retrieved = incoming; - retrievalMonitor.notify(); - }else{ - Monitor::ScopedLock l(dispatchMonitor); - messages.push(incoming); - dispatchMonitor.notify(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && !closed){ - dispatchMonitor.wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - -void Channel::deliver(Consumer* consumer, Message& msg){ - //record delivery tag: - consumer->lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer->listener->received(msg); - - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. - if(!closed){ - bool multiple(false); - switch(consumer->ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer->count) < prefetch) break; - //else drop-through - case AUTO_ACK: - out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple))); - consumer->lastDeliveryTag = 0; - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - -void Channel::dispatch(){ - while(!closed){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - incomingMsg->getData(msg.data); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); - } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers[tag] == 0){ - //signal error - std::cout << "Unknown consumer: " << tag << std::endl; - }else{ - deliver(consumers[tag], msg); - } - } - delete incomingMsg; - } - } -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - returnsHandler = handler; -} - -void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Channel::close(){ - if(con != 0){ - con->closeChannel(this); - } -} diff --git a/Final/cpp/lib/client/ClientChannel.h b/Final/cpp/lib/client/ClientChannel.h deleted file mode 100644 index 066f837430..0000000000 --- a/Final/cpp/lib/client/ClientChannel.h +++ /dev/null @@ -1,321 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <map> -#include <string> -#include <queue> -#include "sys/types.h" - -#ifndef _Channel_ -#define _Channel_ - -#include <framing/amqp_framing.h> -#include <Connection.h> -#include <ClientExchange.h> -#include <IncomingMessage.h> -#include <ClientMessage.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <ResponseHandler.h> -#include <ReturnedMessageHandler.h> - -namespace qpid { -namespace client { - /** - * The available acknowledgements modes - * - * \ingroup clientapi - */ - enum ack_modes { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 - }; - - /** - * Represents an AMQP channel, i.e. loosely a session of work. It - * is through a channel that most of the AMQP 'methods' are - * exposed. - * - * \ingroup clientapi - */ - class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{ - struct Consumer{ - MessageListener* listener; - int ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string,Consumer*>::iterator consumer_iterator; - - u_int16_t id; - Connection* con; - qpid::sys::Thread dispatcher; - qpid::framing::OutputHandler* out; - IncomingMessage* incoming; - ResponseHandler responses; - std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume - IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor dispatchMonitor; - qpid::sys::Monitor retrievalMonitor; - std::map<std::string, Consumer*> consumers; - ReturnedMessageHandler* returnsHandler; - bool closed; - - u_int16_t prefetch; - const bool transactional; - qpid::framing::ProtocolVersion version; - - void enqueue(); - void retrieve(Message& msg); - IncomingMessage* dequeue(); - void dispatch(); - void stop(); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); - void deliver(Consumer* consumer, Message& msg); - void setQos(); - void cancelAll(); - - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - public: - /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously - */ - Channel(bool transactional = false, u_int16_t prefetch = 500); - ~Channel(); - - /** - * Declares an exchange. - * - * In AMQP Exchanges are the destinations to which messages - * are published. They have Queues bound to them and route - * messages they receive to those queues. The routing rules - * depend on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareExchange(Exchange& exchange, bool synch = true); - /** - * Deletes an exchange - * - * @param exchange an Exchange object representing the exchange to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteExchange(Exchange& exchange, bool synch = true); - /** - * Declares a Queue - * - * @param queue a Queue object representing the queue to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareQueue(Queue& queue, bool synch = true); - /** - * Deletes a Queue - * - * @param queue a Queue object representing the queue to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - /** - * Binds a queue to an exchange. The exact semantics of this - * (in particular how 'routing keys' and 'binding arguments' - * are used) depends on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to bind to - * - * @param queue a Queue object representing the queue to be - * bound - * - * @param key the 'routing key' for the binding - * - * @param args the 'binding arguments' for the binding - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const qpid::framing::FieldTable& args, bool synch = true); - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see ack_modes - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const qpid::framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * ack_modes) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - /** - * For a transactional channel this will commit all - * publications and acknowledgements since the last commit (or - * the channel was opened if there has been no previous - * commit). This will cause published messages to become - * available to consumers and acknowledged messages to be - * consumed and removed from the queues they were dispatched - * from. - * - * Transactionailty of a channel is specified when the channel - * object is created (@see Channel()). - */ - void commit(); - /** - * For a transactional channel, this will rollback any - * publications or acknowledgements. It will be as if the - * ppblished messages were never sent and the acknowledged - * messages were never consumed. - */ - void rollback(); - - /** - * Change the prefetch in use. - */ - void setPrefetch(u_int16_t prefetch); - - /** - * Start message dispatching on a new thread - */ - void start(); - /** - * Do message dispatching on this thread - */ - void run(); - - /** - * Closes a channel, stopping any message dispatching. - */ - void close(); - - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - friend class Connection; - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/ClientExchange.cpp b/Final/cpp/lib/client/ClientExchange.cpp deleted file mode 100644 index 5e5f3f14c6..0000000000 --- a/Final/cpp/lib/client/ClientExchange.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientExchange.h> - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/Final/cpp/lib/client/ClientExchange.h b/Final/cpp/lib/client/ClientExchange.h deleted file mode 100644 index a8ac21fa9b..0000000000 --- a/Final/cpp/lib/client/ClientExchange.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _Exchange_ -#define _Exchange_ - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP exchange in the Channel - * methods. Exchanges are the destinations to which messages are - * published. - * - * There are different types of exchange (the standard types are - * available as static constants, see DIRECT_EXCHANGE, - * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to - * an exchange using Channel::bind() and messages published to - * that exchange are then routed to the queue based on the details - * of the binding and the type of exchange. - * - * There are some standard exchange instances that are predeclared - * on all AMQP brokers. These are defined as static members - * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and - * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange - * (member DEFAULT_EXCHANGE) which is nameless and of type - * 'direct' and has every declared queue bound to it by queue - * name. - * - * \ingroup clientapi - */ - class Exchange{ - const std::string name; - const std::string type; - - public: - /** - * A direct exchange routes messages published with routing - * key X to any queue bound with key X (i.e. an exact match is - * used). - */ - static const std::string DIRECT_EXCHANGE; - /** - * A topic exchange treat the key with which a queue is bound - * as a pattern and routes all messages whose routing keys - * match that pattern to the bound queue. The routing key for - * a message must consist of zero or more alpha-numeric words - * delimited by dots. The pattern is of a similar form but * - * can be used to match excatly one word and # can be used to - * match zero or more words. - */ - static const std::string TOPIC_EXCHANGE; - /** - * The headers exchange routes messages based on whether their - * headers match the binding arguments specified when - * binding. (see the AMQP spec for more details). - */ - static const std::string HEADERS_EXCHANGE; - - /** - * The 'default' exchange, nameless and of type 'direct'. Has - * every declared queue bound to it by name. - */ - static const Exchange DEFAULT_EXCHANGE; - /** - * The standard direct exchange, named amq.direct. - */ - static const Exchange STANDARD_DIRECT_EXCHANGE; - /** - * The standard topic exchange, named amq.topic. - */ - static const Exchange STANDARD_TOPIC_EXCHANGE; - /** - * The standard headers exchange, named amq.header. - */ - static const Exchange STANDARD_HEADERS_EXCHANGE; - - Exchange(std::string name, std::string type = DIRECT_EXCHANGE); - const std::string& getName() const; - const std::string& getType() const; - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/ClientMessage.cpp b/Final/cpp/lib/client/ClientMessage.cpp deleted file mode 100644 index e8a2a6019e..0000000000 --- a/Final/cpp/lib/client/ClientMessage.cpp +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientMessage.h> - -using namespace qpid::client; -using namespace qpid::framing; - -Message::Message(){ - header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); -} - -Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ -} - -Message::~Message(){ -} - -BasicHeaderProperties* Message::getHeaderProperties(){ - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -const std::string& Message::getContentType(){ - return getHeaderProperties()->getContentType(); -} - -const std::string& Message::getContentEncoding(){ - return getHeaderProperties()->getContentEncoding(); -} - -FieldTable& Message::getHeaders(){ - return getHeaderProperties()->getHeaders(); -} - -u_int8_t Message::getDeliveryMode(){ - return getHeaderProperties()->getDeliveryMode(); -} - -u_int8_t Message::getPriority(){ - return getHeaderProperties()->getPriority(); -} - -const std::string& Message::getCorrelationId(){ - return getHeaderProperties()->getCorrelationId(); -} - -const std::string& Message::getReplyTo(){ - return getHeaderProperties()->getReplyTo(); -} - -const std::string& Message::getExpiration(){ - return getHeaderProperties()->getExpiration(); -} - -const std::string& Message::getMessageId(){ - return getHeaderProperties()->getMessageId(); -} - -u_int64_t Message::getTimestamp(){ - return getHeaderProperties()->getTimestamp(); -} - -const std::string& Message::getType(){ - return getHeaderProperties()->getType(); -} - -const std::string& Message::getUserId(){ - return getHeaderProperties()->getUserId(); -} - -const std::string& Message::getAppId(){ - return getHeaderProperties()->getAppId(); -} - -const std::string& Message::getClusterId(){ - return getHeaderProperties()->getClusterId(); -} - -void Message::setContentType(const std::string& type){ - getHeaderProperties()->setContentType(type); -} - -void Message::setContentEncoding(const std::string& encoding){ - getHeaderProperties()->setContentEncoding(encoding); -} - -void Message::setHeaders(const FieldTable& headers){ - getHeaderProperties()->setHeaders(headers); -} - -void Message::setDeliveryMode(u_int8_t mode){ - getHeaderProperties()->setDeliveryMode(mode); -} - -void Message::setPriority(u_int8_t priority){ - getHeaderProperties()->setPriority(priority); -} - -void Message::setCorrelationId(const std::string& correlationId){ - getHeaderProperties()->setCorrelationId(correlationId); -} - -void Message::setReplyTo(const std::string& replyTo){ - getHeaderProperties()->setReplyTo(replyTo); -} - -void Message::setExpiration(const std::string& expiration){ - getHeaderProperties()->setExpiration(expiration); -} - -void Message::setMessageId(const std::string& messageId){ - getHeaderProperties()->setMessageId(messageId); -} - -void Message::setTimestamp(u_int64_t timestamp){ - getHeaderProperties()->setTimestamp(timestamp); -} - -void Message::setType(const std::string& type){ - getHeaderProperties()->setType(type); -} - -void Message::setUserId(const std::string& userId){ - getHeaderProperties()->setUserId(userId); -} - -void Message::setAppId(const std::string& appId){ - getHeaderProperties()->setAppId(appId); -} - -void Message::setClusterId(const std::string& clusterId){ - getHeaderProperties()->setClusterId(clusterId); -} diff --git a/Final/cpp/lib/client/ClientMessage.h b/Final/cpp/lib/client/ClientMessage.h deleted file mode 100644 index b46eb0bc72..0000000000 --- a/Final/cpp/lib/client/ClientMessage.h +++ /dev/null @@ -1,114 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <framing/amqp_framing.h> - -#ifndef _Message_ -#define _Message_ - - -namespace qpid { -namespace client { - - /** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ - class Message{ - qpid::framing::AMQHeaderBody::shared_ptr header; - std::string data; - bool redelivered; - u_int64_t deliveryTag; - - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - Message(qpid::framing::AMQHeaderBody::shared_ptr& header); - public: - Message(); - ~Message(); - - /** - * Allows the application to access the content of messages - * received. - * - * @return a string representing the data of the message - */ - inline std::string getData(){ return data; } - /** - * Allows the application to set the content of messages to be - * sent. - * - * @param data a string representing the data of the message - */ - inline void setData(const std::string& _data){ data = _data; } - - /** - * @return true if this message was delivered previously (to - * any consumer) but was not acknowledged. - */ - inline bool isRedelivered(){ return redelivered; } - inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - - inline u_int64_t getDeliveryTag(){ return deliveryTag; } - - const std::string& getContentType(); - const std::string& getContentEncoding(); - qpid::framing::FieldTable& getHeaders(); - u_int8_t getDeliveryMode(); - u_int8_t getPriority(); - const std::string& getCorrelationId(); - const std::string& getReplyTo(); - const std::string& getExpiration(); - const std::string& getMessageId(); - u_int64_t getTimestamp(); - const std::string& getType(); - const std::string& getUserId(); - const std::string& getAppId(); - const std::string& getClusterId(); - - void setContentType(const std::string& type); - void setContentEncoding(const std::string& encoding); - void setHeaders(const qpid::framing::FieldTable& headers); - /** - * Sets the delivery mode. 1 = non-durable, 2 = durable. - */ - void setDeliveryMode(u_int8_t mode); - void setPriority(u_int8_t priority); - void setCorrelationId(const std::string& correlationId); - void setReplyTo(const std::string& replyTo); - void setExpiration(const std::string& expiration); - void setMessageId(const std::string& messageId); - void setTimestamp(u_int64_t timestamp); - void setType(const std::string& type); - void setUserId(const std::string& userId); - void setAppId(const std::string& appId); - void setClusterId(const std::string& clusterId); - - - friend class Channel; - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/ClientQueue.cpp b/Final/cpp/lib/client/ClientQueue.cpp deleted file mode 100644 index 773be504d8..0000000000 --- a/Final/cpp/lib/client/ClientQueue.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientQueue.h> - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) - : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - -bool qpid::client::Queue::isDurable() const{ - return durable; -} - -void qpid::client::Queue::setDurable(bool _durable){ - durable = _durable; -} - - - - diff --git a/Final/cpp/lib/client/ClientQueue.h b/Final/cpp/lib/client/ClientQueue.h deleted file mode 100644 index 4a63097c55..0000000000 --- a/Final/cpp/lib/client/ClientQueue.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _Queue_ -#define _Queue_ - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP queue in the Channel - * methods. Creating an instance of this class does not cause the - * queue to be created on the broker. Rather, an instance of this - * class should be passed to Channel::declareQueue() to ensure - * that the queue exists or is created. - * - * Queues hold messages and allow clients to consume - * (see Channel::consume()) or get (see Channel::get()) those messags. A - * queue receives messages by being bound to one or more Exchange; - * messages published to that exchange may then be routed to the - * queue based on the details of the binding and the type of the - * exchange (see Channel::bind()). - * - * Queues are identified by a name. They can be exclusive (in which - * case they can only be used in the context of the connection - * over which they were declared, and are deleted when then - * connection closes), or they can be shared. Shared queues can be - * auto deleted when they have no consumers. - * - * We use the term 'temporary queue' to refer to an exclusive - * queue. - * - * \ingroup clientapi - */ - class Queue{ - std::string name; - const bool autodelete; - const bool exclusive; - bool durable; - - public: - - /** - * Creates an unnamed, non-durable, temporary queue. A name - * will be assigned to this queue instance by a call to - * Channel::declareQueue(). - */ - Queue(); - /** - * Creates a shared, non-durable, queue with a given name, - * that will not be autodeleted. - * - * @param name the name of the queue - */ - Queue(std::string name); - /** - * Creates a non-durable queue with a given name. - * - * @param name the name of the queue - * - * @param temp if true the queue will be a temporary queue, if - * false it will be shared and not autodeleted. - */ - Queue(std::string name, bool temp); - /** - * This constructor allows the autodelete, exclusive and - * durable propeties to be explictly set. Note however that if - * exclusive is true, autodelete has no meaning as exclusive - * queues are always destroyed when the connection that - * created them is closed. - */ - Queue(std::string name, bool autodelete, bool exclusive, bool durable); - const std::string& getName() const; - void setName(const std::string&); - bool isAutoDelete() const; - bool isExclusive() const; - bool isDurable() const; - void setDurable(bool durable); - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/Connection.cpp b/Final/cpp/lib/client/Connection.cpp deleted file mode 100644 index c00b58a4a9..0000000000 --- a/Final/cpp/lib/client/Connection.cpp +++ /dev/null @@ -1,250 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Connection.h> -#include <ClientChannel.h> -#include <ClientMessage.h> -#include <QpidError.h> -#include <iostream> -#include <MethodBodyInstances.h> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::sys; - -Connection::Connection( bool _debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : - debug(_debug), - channelIdCounter(0), - max_frame_size(_max_frame_size), - closed(true), - version(_version->getMajor(),_version->getMinor()), - tcpNoDelay(false) -{ - connector = new Connector(version, debug, _max_frame_size); -} - -Connection::~Connection(){ - delete connector; -} - -void Connection::setTcpNoDelay(bool on) { - tcpNoDelay = on; -} - -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ - host = _host; - port = _port; - connector->setInputHandler(this); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); - connector->connect(host, port, tcpNoDelay); - - ProtocolInitiation* header = new ProtocolInitiation(version); - responses.expect(); - connector->init(header); - responses.receive(method_bodies.connection_start); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - responses.receive(method_bodies.connection_tune); - - ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - - u_int16_t heartbeat = proposal->getHeartbeat(); - connector->setReadTimeout(heartbeat * 2); - connector->setWriteTimeout(heartbeat); - - //send connection.open - string capabilities; - string vhost = virtualhost; - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). - responses.waitForResponse(); - if(responses.validate(method_bodies.connection_open_ok)){ - //ok - }else if(responses.validate(method_bodies.connection_redirect)){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() << std::endl; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } - closed = false; -} - -void Connection::close(){ - if(!closed){ - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); - connector->close(); - closed = true; - } -} - -void Connection::openChannel(Channel* channel){ - channel->con = this; - channel->id = ++channelIdCounter; - channel->out = out; - channels[channel->id] = channel; - //now send frame to open channel and wait for response - string oob; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); - channel->setQos(); - channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ - //send frame to close channel - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - closeChannel(channel, code, text, classId, methodId); -} - -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ - //send frame to close channel - channel->cancelAll(); - channel->closed = true; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); - channel->con = 0; - channel->out = 0; - removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ - //send frame to close channel - - channels.erase(channel->id); - channel->out = 0; - channel->id = 0; - channel->con = 0; -} - -void Connection::received(AMQFrame* frame){ - u_int16_t channelId = frame->getChannel(); - - if(channelId == 0){ - this->handleBody(frame->getBody()); - }else{ - Channel* channel = channels[channelId]; - if(channel == 0){ - error(504, "Unknown channel"); - }else{ - try{ - channel->handleBody(frame->getBody()); - }catch(qpid::QpidError e){ - channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } - } - } -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ - //connection.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(method_bodies.connection_close.match(body.get())){ - //send back close ok - //close socket - ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); - std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; - connector->close(); - }else{ - std::cout << "Unhandled method for connection: " << *body << std::endl; - error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); - } -} - -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ - error(504, "Channel error: received header body with channel 0."); -} - -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ - error(504, "Channel error: received content body with channel 0."); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Connection::error(int code, const string& msg, int classid, int methodid){ - std::cout << "Connection exception generated: " << code << msg; - if(classid || methodid){ - std::cout << " [" << methodid << ":" << classid << "]"; - } - std::cout << std::endl; - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); - connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; - int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0){ - closeChannel(channel, code, msg); - }else{ - closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); - } -} - -void Connection::idleIn(){ - std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; - connector->close(); -} - -void Connection::idleOut(){ - out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); -} - -void Connection::shutdown(){ - closed = true; - //close all channels - for(iterator i = channels.begin(); i != channels.end(); i++){ - i->second->stop(); - } -} diff --git a/Final/cpp/lib/client/Connection.h b/Final/cpp/lib/client/Connection.h deleted file mode 100644 index 2222250188..0000000000 --- a/Final/cpp/lib/client/Connection.h +++ /dev/null @@ -1,182 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <map> -#include <string> - -#ifndef _Connection_ -#define _Connection_ - -#include <QpidError.h> -#include <Connector.h> -#include <sys/ShutdownHandler.h> -#include <sys/TimeoutHandler.h> - -#include <framing/amqp_framing.h> -#include <ClientExchange.h> -#include <IncomingMessage.h> -#include <ClientMessage.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <ResponseHandler.h> -#include <AMQP_HighestVersion.h> - -namespace qpid { - - /** - * The client namespace contains all classes that make up a client - * implementation of the AMQP protocol. The key classes that form - * the basis of the client API to be used by applications are - * Connection and Channel. - */ -namespace client { - - class Channel; - - /** - * \defgroup clientapi Application API for an AMQP client - */ - - /** - * Represents a connection to an AMQP broker. All communication is - * initiated by establishing a connection, then opening one or - * more Channels over that connection. - * - * \ingroup clientapi - */ - class Connection : public virtual qpid::framing::InputHandler, - public virtual qpid::sys::TimeoutHandler, - public virtual qpid::sys::ShutdownHandler, - private virtual qpid::framing::BodyHandler{ - - typedef std::map<int, Channel*>::iterator iterator; - - const bool debug; - u_int16_t channelIdCounter; - - std::string host; - int port; - const u_int32_t max_frame_size; - std::map<int, Channel*> channels; - Connector* connector; - qpid::framing::OutputHandler* out; - ResponseHandler responses; - volatile bool closed; - qpid::framing::ProtocolVersion version; - bool tcpNoDelay; - - void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); - void error(int code, const std::string& msg, int classid = 0, int methodid = 0); - void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); - - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - public: - /** - * Creates a connection object, but does not open the - * connection. - * - * @param _version the version of the protocol to connect with - * - * @param debug turns on tracing for the connection - * (i.e. prints details of the frames sent and received to std - * out). Optional and defaults to false. - * - * @param max_frame_size the maximum frame size that the - * client will accept. Optional and defaults to 65536. - */ - Connection( bool debug = false, u_int32_t max_frame_size = 65536, - qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion)); - ~Connection(); - - void setTcpNoDelay(bool on); - - /** - * Opens a connection to a broker. - * - * @param host the host on which the broker is running - * - * @param port the port on the which the broker is listening - * - * @param uid the userid to connect with - * - * @param pwd the password to connect with (currently SASL - * PLAIN is the only authentication method supported so this - * is sent in clear text) - * - * @param virtualhost the AMQP virtual host to use (virtual - * hosts, where implemented(!), provide namespace partitioning - * within a single broker). - */ - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", const std::string& pwd = "guest", - const std::string& virtualhost = ""); - /** - * Closes the connection. Any further use of this connection - * (without reopening it) will not succeed. - */ - void close(); - /** - * Opens a Channel. In AMQP channels are like multi-plexed - * 'sessions' of work over a connection. Almost all the - * interaction with AMQP is done over a channel. - * - * @param channel a pointer to a channel instance that will be - * used to represent the new channel. - */ - void openChannel(Channel* channel); - /* - * Requests that the server close this channel, then removes - * the association to the channel from this connection - * - * @param channel a pointer to the channel instance to close - */ - void closeChannel(Channel* channel); - /* - * Removes the channel from association with this connection, - * without sending a close request to the server. - * - * @param channel a pointer to the channel instance to - * disassociate - */ - void removeChannel(Channel* channel); - - virtual void received(qpid::framing::AMQFrame* frame); - - virtual void idleOut(); - virtual void idleIn(); - - virtual void shutdown(); - - /** - * @return the maximum frame size in use on this connection - */ - inline u_int32_t getMaxFrameSize(){ return max_frame_size; } - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/Connector.cpp b/Final/cpp/lib/client/Connector.cpp deleted file mode 100644 index a99360b840..0000000000 --- a/Final/cpp/lib/client/Connector.cpp +++ /dev/null @@ -1,195 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <QpidError.h> -#include <sys/Time.h> -#include "Connector.h" - -using namespace qpid::sys; -using namespace qpid::client; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - version(pVersion), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ } - -Connector::~Connector(){ } - -void Connector::connect(const std::string& host, int port, bool tcpNoDelay){ - socket = Socket::createTcp(); - if (tcpNoDelay) { - socket.setTcpNoDelay(true); - } - socket.connect(host, port); - closed = false; - receiver = Thread(this); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - if (markClosed()) { - socket.close(); - receiver.join(); - } -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - size_t written = 0; - while(written < available && !closed){ - ssize_t sent = socket.send(data + written, available-written); - if(sent > 0) { - lastOut = now() * TIME_MSEC; - written += sent; - } - } -} - -void Connector::handleClosed(){ - if (markClosed()) { - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); - } -} - -bool Connector::markClosed(){ - if (closed) { - return false; - } else { - closed = true; - return true; - } -} - -void Connector::checkIdle(ssize_t status){ - if(timeoutHandler){ - Time t = now() * TIME_MSEC; - if(status == Socket::SOCKET_TIMEOUT) { - if(idleIn && (t - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(status == Socket::SOCKET_EOF){ - handleClosed(); - }else{ - lastIn = t; - } - if(idleOut && (t - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - socket.setTimeout(timeout*TIME_MSEC); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame(version); - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg - << " (" << error.location.file << ":" << error.location.line - << ")" << std::endl; - handleClosed(); - } -} diff --git a/Final/cpp/lib/client/Connector.h b/Final/cpp/lib/client/Connector.h deleted file mode 100644 index 44112369dc..0000000000 --- a/Final/cpp/lib/client/Connector.h +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - - -#include <framing/InputHandler.h> -#include <framing/OutputHandler.h> -#include <framing/InitiationHandler.h> -#include <framing/ProtocolInitiation.h> -#include <ProtocolVersion.h> -#include <sys/ShutdownHandler.h> -#include <sys/TimeoutHandler.h> -#include <sys/Thread.h> -#include <sys/Monitor.h> -#include <sys/Socket.h> - -namespace qpid { -namespace client { - - class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - qpid::framing::ProtocolVersion version; - - volatile bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - qpid::sys::TimeoutHandler* timeoutHandler; - qpid::sys::ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; - - qpid::sys::Socket socket; - - void checkIdle(ssize_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - void handleClosed(); - bool markClosed(); - - public: - Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port, bool tcpNoDelay=false); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); - virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/IncomingMessage.cpp b/Final/cpp/lib/client/IncomingMessage.cpp deleted file mode 100644 index 2ff143ba94..0000000000 --- a/Final/cpp/lib/client/IncomingMessage.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <IncomingMessage.h> -#include <QpidError.h> -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; - -IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} -IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} -IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} - -IncomingMessage::~IncomingMessage(){ -} - -void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; -} - -void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){ - this->content.push_back(_content); -} - -bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == contentSize(); -} - -bool IncomingMessage::isReturn(){ - return returned; -} - -bool IncomingMessage::isDelivery(){ - return delivered; -} - -bool IncomingMessage::isResponse(){ - return response; -} - -const string& IncomingMessage::getConsumerTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); - return delivered->getConsumerTag(); -} - -u_int64_t IncomingMessage::getDeliveryTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); - return delivered->getDeliveryTag(); -} - -AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ - return header; -} - -void IncomingMessage::getData(string& s){ - int count(content.size()); - for(int i = 0; i < count; i++){ - if(i == 0) s = content[i]->getData(); - else s += content[i]->getData(); - } -} - -u_int64_t IncomingMessage::contentSize(){ - u_int64_t size(0); - u_int64_t count(content.size()); - for(u_int64_t i = 0; i < count; i++){ - size += content[i]->size(); - } - return size; -} diff --git a/Final/cpp/lib/client/IncomingMessage.h b/Final/cpp/lib/client/IncomingMessage.h deleted file mode 100644 index 464e05d877..0000000000 --- a/Final/cpp/lib/client/IncomingMessage.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <vector> -#include <framing/amqp_framing.h> - -#ifndef _IncomingMessage_ -#define _IncomingMessage_ - -#include <ClientMessage.h> - -namespace qpid { -namespace client { - - class IncomingMessage{ - //content will be preceded by one of these method frames - qpid::framing::BasicDeliverBody::shared_ptr delivered; - qpid::framing::BasicReturnBody::shared_ptr returned; - qpid::framing::BasicGetOkBody::shared_ptr response; - qpid::framing::AMQHeaderBody::shared_ptr header; - std::vector<qpid::framing::AMQContentBody::shared_ptr> content; - - u_int64_t contentSize(); - public: - IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro); - ~IncomingMessage(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr content); - bool isComplete(); - bool isReturn(); - bool isDelivery(); - bool isResponse(); - const std::string& getConsumerTag();//only relevant if isDelivery() - qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); - u_int64_t getDeliveryTag(); - void getData(std::string& data); - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/Makefile.am b/Final/cpp/lib/client/Makefile.am deleted file mode 100644 index 9935da654a..0000000000 --- a/Final/cpp/lib/client/Makefile.am +++ /dev/null @@ -1,52 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -AM_CXXFLAGS = $(WARNING_CFLAGS) -INCLUDES = \ - -I$(top_srcdir)/gen \ - -I$(top_srcdir)/lib/common \ - -I$(top_srcdir)/lib/common/sys \ - -I$(top_srcdir)/lib/common/framing \ - $(APR_CXXFLAGS) - -lib_LTLIBRARIES = libqpidclient.la -libqpidclient_la_LIBADD = ../common/libqpidcommon.la -libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) -libqpidclient_la_SOURCES = \ - ClientChannel.cpp \ - ClientExchange.cpp \ - ClientMessage.cpp \ - ClientQueue.cpp \ - Connection.cpp \ - Connector.cpp \ - IncomingMessage.cpp \ - MessageListener.cpp \ - ResponseHandler.cpp \ - ReturnedMessageHandler.cpp -pkginclude_HEADERS = \ - ClientChannel.h \ - ClientExchange.h \ - ClientMessage.h \ - ClientQueue.h \ - Connection.h \ - Connector.h \ - IncomingMessage.h \ - MessageListener.h \ - MethodBodyInstances.h \ - ResponseHandler.h \ - ReturnedMessageHandler.h diff --git a/Final/cpp/lib/client/MessageListener.cpp b/Final/cpp/lib/client/MessageListener.cpp deleted file mode 100644 index 70d44e7040..0000000000 --- a/Final/cpp/lib/client/MessageListener.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <MessageListener.h> - -qpid::client::MessageListener::~MessageListener() {} diff --git a/Final/cpp/lib/client/MessageListener.h b/Final/cpp/lib/client/MessageListener.h deleted file mode 100644 index cfb917b4f8..0000000000 --- a/Final/cpp/lib/client/MessageListener.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _MessageListener_ -#define _MessageListener_ - -#include <ClientMessage.h> - -namespace qpid { -namespace client { - - /** - * An interface through which asynchronously delivered messages - * can be received by an application. - * - * @see Channel::consume() - * - * \ingroup clientapi - */ - class MessageListener{ - public: - virtual ~MessageListener(); - virtual void received(Message& msg) = 0; - }; - -} -} - - -#endif diff --git a/Final/cpp/lib/client/MethodBodyInstances.h b/Final/cpp/lib/client/MethodBodyInstances.h deleted file mode 100644 index 3ab0c9af8f..0000000000 --- a/Final/cpp/lib/client/MethodBodyInstances.h +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <framing/amqp_framing.h> - -#ifndef _MethodBodyInstances_h_ -#define _MethodBodyInstances_h_ - -namespace qpid { -namespace client { - -/** - * A list of method body instances that can be used to compare against - * incoming bodies. - */ -class MethodBodyInstances -{ -private: - qpid::framing::ProtocolVersion version; -public: - const qpid::framing::BasicCancelOkBody basic_cancel_ok; - const qpid::framing::BasicConsumeOkBody basic_consume_ok; - const qpid::framing::BasicDeliverBody basic_deliver; - const qpid::framing::BasicGetEmptyBody basic_get_empty; - const qpid::framing::BasicGetOkBody basic_get_ok; - const qpid::framing::BasicQosOkBody basic_qos_ok; - const qpid::framing::BasicReturnBody basic_return; - const qpid::framing::ChannelCloseBody channel_close; - const qpid::framing::ChannelCloseOkBody channel_close_ok; - const qpid::framing::ChannelFlowBody channel_flow; - const qpid::framing::ChannelOpenOkBody channel_open_ok; - const qpid::framing::ConnectionCloseBody connection_close; - const qpid::framing::ConnectionCloseOkBody connection_close_ok; - const qpid::framing::ConnectionOpenOkBody connection_open_ok; - const qpid::framing::ConnectionRedirectBody connection_redirect; - const qpid::framing::ConnectionStartBody connection_start; - const qpid::framing::ConnectionTuneBody connection_tune; - const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; - const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; - const qpid::framing::QueueDeclareOkBody queue_declare_ok; - const qpid::framing::QueueDeleteOkBody queue_delete_ok; - const qpid::framing::QueueBindOkBody queue_bind_ok; - const qpid::framing::TxCommitOkBody tx_commit_ok; - const qpid::framing::TxRollbackOkBody tx_rollback_ok; - const qpid::framing::TxSelectOkBody tx_select_ok; - - MethodBodyInstances(u_int8_t major, u_int8_t minor) : - version(major, minor), - basic_cancel_ok(version), - basic_consume_ok(version), - basic_deliver(version), - basic_get_empty(version), - basic_get_ok(version), - basic_qos_ok(version), - basic_return(version), - channel_close(version), - channel_close_ok(version), - channel_flow(version), - channel_open_ok(version), - connection_close(version), - connection_close_ok(version), - connection_open_ok(version), - connection_redirect(version), - connection_start(version), - connection_tune(version), - exchange_declare_ok(version), - exchange_delete_ok(version), - queue_declare_ok(version), - queue_delete_ok(version), - queue_bind_ok(version), - tx_commit_ok(version), - tx_rollback_ok(version), - tx_select_ok(version) - {} - -}; - -static MethodBodyInstances method_bodies(8, 0); - -} // namespace client -} // namespace qpid - -#endif diff --git a/Final/cpp/lib/client/ResponseHandler.cpp b/Final/cpp/lib/client/ResponseHandler.cpp deleted file mode 100644 index ac8b4a9ced..0000000000 --- a/Final/cpp/lib/client/ResponseHandler.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ResponseHandler.h> -#include <sys/Monitor.h> -#include <QpidError.h> - -using namespace qpid::sys; - -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} - -qpid::client::ResponseHandler::~ResponseHandler(){} - -bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ - return expected.match(response.get()); -} - -void qpid::client::ResponseHandler::waitForResponse(){ - Monitor::ScopedLock l(monitor); - if(waiting){ - monitor.wait(); - } -} - -void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ - response = _response; - Monitor::ScopedLock l(monitor); - waiting = false; - monitor.notify(); -} - -void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ - Monitor::ScopedLock l(monitor); - if(waiting){ - monitor.wait(); - } - if(!validate(expected)){ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); - } -} - -void qpid::client::ResponseHandler::expect(){ - waiting = true; -} diff --git a/Final/cpp/lib/client/ResponseHandler.h b/Final/cpp/lib/client/ResponseHandler.h deleted file mode 100644 index c3d499d046..0000000000 --- a/Final/cpp/lib/client/ResponseHandler.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <framing/amqp_framing.h> -#include <sys/Monitor.h> - -#ifndef _ResponseHandler_ -#define _ResponseHandler_ - -namespace qpid { - namespace client { - - class ResponseHandler{ - bool waiting; - qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor monitor; - - public: - ResponseHandler(); - ~ResponseHandler(); - inline bool isWaiting(){ return waiting; } - inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; } - bool validate(const qpid::framing::AMQMethodBody& expected); - void waitForResponse(); - void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response); - void receive(const qpid::framing::AMQMethodBody& expected); - void expect();//must be called before calling receive - }; - - } -} - - -#endif diff --git a/Final/cpp/lib/client/ReturnedMessageHandler.cpp b/Final/cpp/lib/client/ReturnedMessageHandler.cpp deleted file mode 100644 index ee9f7462ef..0000000000 --- a/Final/cpp/lib/client/ReturnedMessageHandler.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <ReturnedMessageHandler.h> - -qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} diff --git a/Final/cpp/lib/client/ReturnedMessageHandler.h b/Final/cpp/lib/client/ReturnedMessageHandler.h deleted file mode 100644 index 137f0b2e17..0000000000 --- a/Final/cpp/lib/client/ReturnedMessageHandler.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _ReturnedMessageHandler_ -#define _ReturnedMessageHandler_ - -#include <ClientMessage.h> - -namespace qpid { -namespace client { - - /** - * An interface through which returned messages can be received by - * an application. - * - * @see Channel::setReturnedMessageHandler() - * - * \ingroup clientapi - */ - class ReturnedMessageHandler{ - public: - virtual ~ReturnedMessageHandler(); - virtual void returned(Message& msg) = 0; - }; - -} -} - - -#endif |