summaryrefslogtreecommitdiff
path: root/Final/cpp/lib/client
diff options
context:
space:
mode:
Diffstat (limited to 'Final/cpp/lib/client')
-rw-r--r--Final/cpp/lib/client/ClientChannel.cpp434
-rw-r--r--Final/cpp/lib/client/ClientChannel.h321
-rw-r--r--Final/cpp/lib/client/ClientExchange.cpp34
-rw-r--r--Final/cpp/lib/client/ClientExchange.h106
-rw-r--r--Final/cpp/lib/client/ClientMessage.cpp150
-rw-r--r--Final/cpp/lib/client/ClientMessage.h114
-rw-r--r--Final/cpp/lib/client/ClientQueue.cpp58
-rw-r--r--Final/cpp/lib/client/ClientQueue.h104
-rw-r--r--Final/cpp/lib/client/Connection.cpp250
-rw-r--r--Final/cpp/lib/client/Connection.h182
-rw-r--r--Final/cpp/lib/client/Connector.cpp195
-rw-r--r--Final/cpp/lib/client/Connector.h97
-rw-r--r--Final/cpp/lib/client/IncomingMessage.cpp88
-rw-r--r--Final/cpp/lib/client/IncomingMessage.h63
-rw-r--r--Final/cpp/lib/client/Makefile.am52
-rw-r--r--Final/cpp/lib/client/MessageListener.cpp24
-rw-r--r--Final/cpp/lib/client/MessageListener.h49
-rw-r--r--Final/cpp/lib/client/MethodBodyInstances.h100
-rw-r--r--Final/cpp/lib/client/ResponseHandler.cpp61
-rw-r--r--Final/cpp/lib/client/ResponseHandler.h52
-rw-r--r--Final/cpp/lib/client/ReturnedMessageHandler.cpp24
-rw-r--r--Final/cpp/lib/client/ReturnedMessageHandler.h49
22 files changed, 0 insertions, 2607 deletions
diff --git a/Final/cpp/lib/client/ClientChannel.cpp b/Final/cpp/lib/client/ClientChannel.cpp
deleted file mode 100644
index a97d79dcf9..0000000000
--- a/Final/cpp/lib/client/ClientChannel.cpp
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ClientChannel.h>
-#include <sys/Monitor.h>
-#include <ClientMessage.h>
-#include <QpidError.h>
-#include <MethodBodyInstances.h>
-
-using namespace boost; //to use dynamic_pointer_cast
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- id(0),
- con(0),
- out(0),
- incoming(0),
- closed(true),
- prefetch(_prefetch),
- transactional(_transactional),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- version(8, 0)
-{ }
-
-Channel::~Channel(){
- stop();
-}
-
-void Channel::setPrefetch(u_int16_t _prefetch){
- prefetch = _prefetch;
- if(con != 0 && out != 0){
- setQos();
- }
-}
-
-void Channel::setQos(){
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
- if(transactional){
- sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
- }
-}
-
-void Channel::declareExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- string type = exchange.getType();
- FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_declare_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::deleteExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_delete_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::declareQueue(Queue& queue, bool synch){
- string name = queue.getName();
- FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(),
- queue.isAutoDelete(), !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_declare_ok);
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
- queue.setName(response->getQueue());
- }
- }else{
- out->send(frame);
- }
-}
-
-void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- //ticket, queue, ifunused, ifempty, nowait
- string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_delete_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
- string e = exchange.getName();
- string q = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_bind_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- string q = queue.getName();
- AMQFrame* frame =
- new AMQFrame(version,
- id,
- new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_consume_ok);
- BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
- tag = response->getConsumerTag();
- }else{
- out->send(frame);
- }
- Consumer* c = new Consumer();
- c->listener = listener;
- c->ackMode = ackMode;
- c->lastDeliveryTag = 0;
- consumers[tag] = c;
-}
-
-void Channel::cancel(std::string& tag, bool synch){
- Consumer* c = consumers[tag];
- if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
- }
-
- AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_cancel_ok);
- }else{
- out->send(frame);
- }
- consumers.erase(tag);
- if(c != 0){
- delete c;
- }
-}
-
-void Channel::cancelAll(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
- Consumer* c = i->second;
- if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
- }
- consumers.erase(i);
- delete c;
- }
-}
-
-void Channel::retrieve(Message& msg){
- Monitor::ScopedLock l(retrievalMonitor);
- while(retrieved == 0){
- retrievalMonitor.wait();
- }
-
- msg.header = retrieved->getHeader();
- msg.deliveryTag = retrieved->getDeliveryTag();
- retrieved->getData(msg.data);
- delete retrieved;
- retrieved = 0;
-}
-
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
- string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
- responses.expect();
- out->send(frame);
- responses.waitForResponse();
- AMQMethodBody::shared_ptr response = responses.getResponse();
- if(method_bodies.basic_get_ok.match(response.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
- }
- retrieve(msg);
- return true;
- }if(method_bodies.basic_get_empty.match(response.get())){
- return false;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
- }
-}
-
-
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
- string e = exchange.getName();
- string key = routingKey;
-
- out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
- //break msg up into header frame and content frame(s) and send these
- string data = msg.getData();
- msg.header->setContentSize(data.length());
- AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(version, id, body));
-
- u_int64_t data_length = data.length();
- if(data_length > 0){
- u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
- if(data_length < frag_size){
- out->send(new AMQFrame(version, id, new AMQContentBody(data)));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
-void Channel::commit(){
- AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
- sendAndReceive(frame, method_bodies.tx_commit_ok);
-}
-
-void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
- sendAndReceive(frame, method_bodies.tx_rollback_ok);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
- //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
- if(method_bodies.basic_deliver.match(body.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
- }
- }else if(method_bodies.basic_return.match(body.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
- }
- }else if(method_bodies.channel_close.match(body.get())){
- con->removeChannel(this);
- //need to signal application that channel has been closed through exception
-
- }else if(method_bodies.channel_flow.match(body.get())){
-
- } else if(responses.isWaiting()){
- responses.signalResponse(body);
- }else{
- //signal error
- std::cout << "Unhandled method: " << *body << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
- }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
- }else{
- incoming->setHeader(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
- }else{
- incoming->addContent(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
-}
-
-void Channel::start(){
- dispatcher = Thread(this);
-}
-
-void Channel::stop(){
- {
- Monitor::ScopedLock l(dispatchMonitor);
- closed = true;
- dispatchMonitor.notify();
- }
- dispatcher.join();
-}
-
-void Channel::run(){
- dispatch();
-}
-
-void Channel::enqueue(){
- if(incoming->isResponse()){
- Monitor::ScopedLock l(retrievalMonitor);
- retrieved = incoming;
- retrievalMonitor.notify();
- }else{
- Monitor::ScopedLock l(dispatchMonitor);
- messages.push(incoming);
- dispatchMonitor.notify();
- }
- incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
- Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && !closed){
- dispatchMonitor.wait();
- }
- IncomingMessage* msg = 0;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
-void Channel::deliver(Consumer* consumer, Message& msg){
- //record delivery tag:
- consumer->lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer->listener->received(msg);
-
- //if the handler calls close on the channel or connection while
- //handling this message, then consumer will now have been deleted.
- if(!closed){
- bool multiple(false);
- switch(consumer->ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer->count) < prefetch) break;
- //else drop-through
- case AUTO_ACK:
- out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-void Channel::dispatch(){
- while(!closed){
- IncomingMessage* incomingMsg = dequeue();
- if(incomingMsg){
- //Note: msg is currently only valid for duration of this call
- Message msg(incomingMsg->getHeader());
- incomingMsg->getData(msg.data);
- if(incomingMsg->isReturn()){
- if(returnsHandler == 0){
- //print warning to log/console
- std::cout << "Message returned: " << msg.getData() << std::endl;
- }else{
- returnsHandler->returned(msg);
- }
- }else{
- msg.deliveryTag = incomingMsg->getDeliveryTag();
- std::string tag = incomingMsg->getConsumerTag();
-
- if(consumers[tag] == 0){
- //signal error
- std::cout << "Unknown consumer: " << tag << std::endl;
- }else{
- deliver(consumers[tag], msg);
- }
- }
- delete incomingMsg;
- }
- }
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- returnsHandler = handler;
-}
-
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
-}
-
-void Channel::close(){
- if(con != 0){
- con->closeChannel(this);
- }
-}
diff --git a/Final/cpp/lib/client/ClientChannel.h b/Final/cpp/lib/client/ClientChannel.h
deleted file mode 100644
index 066f837430..0000000000
--- a/Final/cpp/lib/client/ClientChannel.h
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <map>
-#include <string>
-#include <queue>
-#include "sys/types.h"
-
-#ifndef _Channel_
-#define _Channel_
-
-#include <framing/amqp_framing.h>
-#include <Connection.h>
-#include <ClientExchange.h>
-#include <IncomingMessage.h>
-#include <ClientMessage.h>
-#include <MessageListener.h>
-#include <ClientQueue.h>
-#include <ResponseHandler.h>
-#include <ReturnedMessageHandler.h>
-
-namespace qpid {
-namespace client {
- /**
- * The available acknowledgements modes
- *
- * \ingroup clientapi
- */
- enum ack_modes {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
- };
-
- /**
- * Represents an AMQP channel, i.e. loosely a session of work. It
- * is through a channel that most of the AMQP 'methods' are
- * exposed.
- *
- * \ingroup clientapi
- */
- class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string,Consumer*>::iterator consumer_iterator;
-
- u_int16_t id;
- Connection* con;
- qpid::sys::Thread dispatcher;
- qpid::framing::OutputHandler* out;
- IncomingMessage* incoming;
- ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- qpid::sys::Monitor dispatchMonitor;
- qpid::sys::Monitor retrievalMonitor;
- std::map<std::string, Consumer*> consumers;
- ReturnedMessageHandler* returnsHandler;
- bool closed;
-
- u_int16_t prefetch;
- const bool transactional;
- qpid::framing::ProtocolVersion version;
-
- void enqueue();
- void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
- void stop();
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
- void deliver(Consumer* consumer, Message& msg);
- void setQos();
- void cancelAll();
-
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
- public:
- /**
- * Creates a channel object.
- *
- * @param transactional if true, the publishing and acknowledgement
- * of messages will be transactional and can be committed or
- * aborted in atomic units (@see commit(), @see rollback())
- *
- * @param prefetch specifies the number of unacknowledged
- * messages the channel is willing to have sent to it
- * asynchronously
- */
- Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
-
- /**
- * Declares an exchange.
- *
- * In AMQP Exchanges are the destinations to which messages
- * are published. They have Queues bound to them and route
- * messages they receive to those queues. The routing rules
- * depend on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareExchange(Exchange& exchange, bool synch = true);
- /**
- * Deletes an exchange
- *
- * @param exchange an Exchange object representing the exchange to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteExchange(Exchange& exchange, bool synch = true);
- /**
- * Declares a Queue
- *
- * @param queue a Queue object representing the queue to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareQueue(Queue& queue, bool synch = true);
- /**
- * Deletes a Queue
- *
- * @param queue a Queue object representing the queue to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- /**
- * Binds a queue to an exchange. The exact semantics of this
- * (in particular how 'routing keys' and 'binding arguments'
- * are used) depends on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to bind to
- *
- * @param queue a Queue object representing the queue to be
- * bound
- *
- * @param key the 'routing key' for the binding
- *
- * @param args the 'binding arguments' for the binding
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const qpid::framing::FieldTable& args, bool synch = true);
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see ack_modes
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const qpid::framing::FieldTable* fields = 0);
-
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * ack_modes)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- /**
- * For a transactional channel this will commit all
- * publications and acknowledgements since the last commit (or
- * the channel was opened if there has been no previous
- * commit). This will cause published messages to become
- * available to consumers and acknowledged messages to be
- * consumed and removed from the queues they were dispatched
- * from.
- *
- * Transactionailty of a channel is specified when the channel
- * object is created (@see Channel()).
- */
- void commit();
- /**
- * For a transactional channel, this will rollback any
- * publications or acknowledgements. It will be as if the
- * ppblished messages were never sent and the acknowledged
- * messages were never consumed.
- */
- void rollback();
-
- /**
- * Change the prefetch in use.
- */
- void setPrefetch(u_int16_t prefetch);
-
- /**
- * Start message dispatching on a new thread
- */
- void start();
- /**
- * Do message dispatching on this thread
- */
- void run();
-
- /**
- * Closes a channel, stopping any message dispatching.
- */
- void close();
-
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- friend class Connection;
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/ClientExchange.cpp b/Final/cpp/lib/client/ClientExchange.cpp
deleted file mode 100644
index 5e5f3f14c6..0000000000
--- a/Final/cpp/lib/client/ClientExchange.cpp
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ClientExchange.h>
-
-qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
-const std::string& qpid::client::Exchange::getName() const { return name; }
-const std::string& qpid::client::Exchange::getType() const { return type; }
-
-const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
-const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
-const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
-
-const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/Final/cpp/lib/client/ClientExchange.h b/Final/cpp/lib/client/ClientExchange.h
deleted file mode 100644
index a8ac21fa9b..0000000000
--- a/Final/cpp/lib/client/ClientExchange.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _Exchange_
-#define _Exchange_
-
-namespace qpid {
-namespace client {
-
- /**
- * A 'handle' used to represent an AMQP exchange in the Channel
- * methods. Exchanges are the destinations to which messages are
- * published.
- *
- * There are different types of exchange (the standard types are
- * available as static constants, see DIRECT_EXCHANGE,
- * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to
- * an exchange using Channel::bind() and messages published to
- * that exchange are then routed to the queue based on the details
- * of the binding and the type of exchange.
- *
- * There are some standard exchange instances that are predeclared
- * on all AMQP brokers. These are defined as static members
- * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and
- * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange
- * (member DEFAULT_EXCHANGE) which is nameless and of type
- * 'direct' and has every declared queue bound to it by queue
- * name.
- *
- * \ingroup clientapi
- */
- class Exchange{
- const std::string name;
- const std::string type;
-
- public:
- /**
- * A direct exchange routes messages published with routing
- * key X to any queue bound with key X (i.e. an exact match is
- * used).
- */
- static const std::string DIRECT_EXCHANGE;
- /**
- * A topic exchange treat the key with which a queue is bound
- * as a pattern and routes all messages whose routing keys
- * match that pattern to the bound queue. The routing key for
- * a message must consist of zero or more alpha-numeric words
- * delimited by dots. The pattern is of a similar form but *
- * can be used to match excatly one word and # can be used to
- * match zero or more words.
- */
- static const std::string TOPIC_EXCHANGE;
- /**
- * The headers exchange routes messages based on whether their
- * headers match the binding arguments specified when
- * binding. (see the AMQP spec for more details).
- */
- static const std::string HEADERS_EXCHANGE;
-
- /**
- * The 'default' exchange, nameless and of type 'direct'. Has
- * every declared queue bound to it by name.
- */
- static const Exchange DEFAULT_EXCHANGE;
- /**
- * The standard direct exchange, named amq.direct.
- */
- static const Exchange STANDARD_DIRECT_EXCHANGE;
- /**
- * The standard topic exchange, named amq.topic.
- */
- static const Exchange STANDARD_TOPIC_EXCHANGE;
- /**
- * The standard headers exchange, named amq.header.
- */
- static const Exchange STANDARD_HEADERS_EXCHANGE;
-
- Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
- const std::string& getName() const;
- const std::string& getType() const;
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/ClientMessage.cpp b/Final/cpp/lib/client/ClientMessage.cpp
deleted file mode 100644
index e8a2a6019e..0000000000
--- a/Final/cpp/lib/client/ClientMessage.cpp
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ClientMessage.h>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-Message::Message(){
- header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC));
-}
-
-Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
-}
-
-Message::~Message(){
-}
-
-BasicHeaderProperties* Message::getHeaderProperties(){
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-const std::string& Message::getContentType(){
- return getHeaderProperties()->getContentType();
-}
-
-const std::string& Message::getContentEncoding(){
- return getHeaderProperties()->getContentEncoding();
-}
-
-FieldTable& Message::getHeaders(){
- return getHeaderProperties()->getHeaders();
-}
-
-u_int8_t Message::getDeliveryMode(){
- return getHeaderProperties()->getDeliveryMode();
-}
-
-u_int8_t Message::getPriority(){
- return getHeaderProperties()->getPriority();
-}
-
-const std::string& Message::getCorrelationId(){
- return getHeaderProperties()->getCorrelationId();
-}
-
-const std::string& Message::getReplyTo(){
- return getHeaderProperties()->getReplyTo();
-}
-
-const std::string& Message::getExpiration(){
- return getHeaderProperties()->getExpiration();
-}
-
-const std::string& Message::getMessageId(){
- return getHeaderProperties()->getMessageId();
-}
-
-u_int64_t Message::getTimestamp(){
- return getHeaderProperties()->getTimestamp();
-}
-
-const std::string& Message::getType(){
- return getHeaderProperties()->getType();
-}
-
-const std::string& Message::getUserId(){
- return getHeaderProperties()->getUserId();
-}
-
-const std::string& Message::getAppId(){
- return getHeaderProperties()->getAppId();
-}
-
-const std::string& Message::getClusterId(){
- return getHeaderProperties()->getClusterId();
-}
-
-void Message::setContentType(const std::string& type){
- getHeaderProperties()->setContentType(type);
-}
-
-void Message::setContentEncoding(const std::string& encoding){
- getHeaderProperties()->setContentEncoding(encoding);
-}
-
-void Message::setHeaders(const FieldTable& headers){
- getHeaderProperties()->setHeaders(headers);
-}
-
-void Message::setDeliveryMode(u_int8_t mode){
- getHeaderProperties()->setDeliveryMode(mode);
-}
-
-void Message::setPriority(u_int8_t priority){
- getHeaderProperties()->setPriority(priority);
-}
-
-void Message::setCorrelationId(const std::string& correlationId){
- getHeaderProperties()->setCorrelationId(correlationId);
-}
-
-void Message::setReplyTo(const std::string& replyTo){
- getHeaderProperties()->setReplyTo(replyTo);
-}
-
-void Message::setExpiration(const std::string& expiration){
- getHeaderProperties()->setExpiration(expiration);
-}
-
-void Message::setMessageId(const std::string& messageId){
- getHeaderProperties()->setMessageId(messageId);
-}
-
-void Message::setTimestamp(u_int64_t timestamp){
- getHeaderProperties()->setTimestamp(timestamp);
-}
-
-void Message::setType(const std::string& type){
- getHeaderProperties()->setType(type);
-}
-
-void Message::setUserId(const std::string& userId){
- getHeaderProperties()->setUserId(userId);
-}
-
-void Message::setAppId(const std::string& appId){
- getHeaderProperties()->setAppId(appId);
-}
-
-void Message::setClusterId(const std::string& clusterId){
- getHeaderProperties()->setClusterId(clusterId);
-}
diff --git a/Final/cpp/lib/client/ClientMessage.h b/Final/cpp/lib/client/ClientMessage.h
deleted file mode 100644
index b46eb0bc72..0000000000
--- a/Final/cpp/lib/client/ClientMessage.h
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <framing/amqp_framing.h>
-
-#ifndef _Message_
-#define _Message_
-
-
-namespace qpid {
-namespace client {
-
- /**
- * A representation of messages for sent or recived through the
- * client api.
- *
- * \ingroup clientapi
- */
- class Message{
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- bool redelivered;
- u_int64_t deliveryTag;
-
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
- public:
- Message();
- ~Message();
-
- /**
- * Allows the application to access the content of messages
- * received.
- *
- * @return a string representing the data of the message
- */
- inline std::string getData(){ return data; }
- /**
- * Allows the application to set the content of messages to be
- * sent.
- *
- * @param data a string representing the data of the message
- */
- inline void setData(const std::string& _data){ data = _data; }
-
- /**
- * @return true if this message was delivered previously (to
- * any consumer) but was not acknowledged.
- */
- inline bool isRedelivered(){ return redelivered; }
- inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
-
- inline u_int64_t getDeliveryTag(){ return deliveryTag; }
-
- const std::string& getContentType();
- const std::string& getContentEncoding();
- qpid::framing::FieldTable& getHeaders();
- u_int8_t getDeliveryMode();
- u_int8_t getPriority();
- const std::string& getCorrelationId();
- const std::string& getReplyTo();
- const std::string& getExpiration();
- const std::string& getMessageId();
- u_int64_t getTimestamp();
- const std::string& getType();
- const std::string& getUserId();
- const std::string& getAppId();
- const std::string& getClusterId();
-
- void setContentType(const std::string& type);
- void setContentEncoding(const std::string& encoding);
- void setHeaders(const qpid::framing::FieldTable& headers);
- /**
- * Sets the delivery mode. 1 = non-durable, 2 = durable.
- */
- void setDeliveryMode(u_int8_t mode);
- void setPriority(u_int8_t priority);
- void setCorrelationId(const std::string& correlationId);
- void setReplyTo(const std::string& replyTo);
- void setExpiration(const std::string& expiration);
- void setMessageId(const std::string& messageId);
- void setTimestamp(u_int64_t timestamp);
- void setType(const std::string& type);
- void setUserId(const std::string& userId);
- void setAppId(const std::string& appId);
- void setClusterId(const std::string& clusterId);
-
-
- friend class Channel;
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/ClientQueue.cpp b/Final/cpp/lib/client/ClientQueue.cpp
deleted file mode 100644
index 773be504d8..0000000000
--- a/Final/cpp/lib/client/ClientQueue.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ClientQueue.h>
-
-qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){}
-
-qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){}
-
-qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){}
-
-qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable)
- : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){}
-
-const std::string& qpid::client::Queue::getName() const{
- return name;
-}
-
-void qpid::client::Queue::setName(const std::string& _name){
- name = _name;
-}
-
-bool qpid::client::Queue::isAutoDelete() const{
- return autodelete;
-}
-
-bool qpid::client::Queue::isExclusive() const{
- return exclusive;
-}
-
-bool qpid::client::Queue::isDurable() const{
- return durable;
-}
-
-void qpid::client::Queue::setDurable(bool _durable){
- durable = _durable;
-}
-
-
-
-
diff --git a/Final/cpp/lib/client/ClientQueue.h b/Final/cpp/lib/client/ClientQueue.h
deleted file mode 100644
index 4a63097c55..0000000000
--- a/Final/cpp/lib/client/ClientQueue.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _Queue_
-#define _Queue_
-
-namespace qpid {
-namespace client {
-
- /**
- * A 'handle' used to represent an AMQP queue in the Channel
- * methods. Creating an instance of this class does not cause the
- * queue to be created on the broker. Rather, an instance of this
- * class should be passed to Channel::declareQueue() to ensure
- * that the queue exists or is created.
- *
- * Queues hold messages and allow clients to consume
- * (see Channel::consume()) or get (see Channel::get()) those messags. A
- * queue receives messages by being bound to one or more Exchange;
- * messages published to that exchange may then be routed to the
- * queue based on the details of the binding and the type of the
- * exchange (see Channel::bind()).
- *
- * Queues are identified by a name. They can be exclusive (in which
- * case they can only be used in the context of the connection
- * over which they were declared, and are deleted when then
- * connection closes), or they can be shared. Shared queues can be
- * auto deleted when they have no consumers.
- *
- * We use the term 'temporary queue' to refer to an exclusive
- * queue.
- *
- * \ingroup clientapi
- */
- class Queue{
- std::string name;
- const bool autodelete;
- const bool exclusive;
- bool durable;
-
- public:
-
- /**
- * Creates an unnamed, non-durable, temporary queue. A name
- * will be assigned to this queue instance by a call to
- * Channel::declareQueue().
- */
- Queue();
- /**
- * Creates a shared, non-durable, queue with a given name,
- * that will not be autodeleted.
- *
- * @param name the name of the queue
- */
- Queue(std::string name);
- /**
- * Creates a non-durable queue with a given name.
- *
- * @param name the name of the queue
- *
- * @param temp if true the queue will be a temporary queue, if
- * false it will be shared and not autodeleted.
- */
- Queue(std::string name, bool temp);
- /**
- * This constructor allows the autodelete, exclusive and
- * durable propeties to be explictly set. Note however that if
- * exclusive is true, autodelete has no meaning as exclusive
- * queues are always destroyed when the connection that
- * created them is closed.
- */
- Queue(std::string name, bool autodelete, bool exclusive, bool durable);
- const std::string& getName() const;
- void setName(const std::string&);
- bool isAutoDelete() const;
- bool isExclusive() const;
- bool isDurable() const;
- void setDurable(bool durable);
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/Connection.cpp b/Final/cpp/lib/client/Connection.cpp
deleted file mode 100644
index c00b58a4a9..0000000000
--- a/Final/cpp/lib/client/Connection.cpp
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <Connection.h>
-#include <ClientChannel.h>
-#include <ClientMessage.h>
-#include <QpidError.h>
-#include <iostream>
-#include <MethodBodyInstances.h>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace qpid::sys;
-
-Connection::Connection( bool _debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) :
- debug(_debug),
- channelIdCounter(0),
- max_frame_size(_max_frame_size),
- closed(true),
- version(_version->getMajor(),_version->getMinor()),
- tcpNoDelay(false)
-{
- connector = new Connector(version, debug, _max_frame_size);
-}
-
-Connection::~Connection(){
- delete connector;
-}
-
-void Connection::setTcpNoDelay(bool on) {
- tcpNoDelay = on;
-}
-
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
- host = _host;
- port = _port;
- connector->setInputHandler(this);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
- out = connector->getOutputHandler();
- connector->connect(host, port, tcpNoDelay);
-
- ProtocolInitiation* header = new ProtocolInitiation(version);
- responses.expect();
- connector->init(header);
- responses.receive(method_bodies.connection_start);
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- responses.receive(method_bodies.connection_tune);
-
- ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
- u_int16_t heartbeat = proposal->getHeartbeat();
- connector->setReadTimeout(heartbeat * 2);
- connector->setWriteTimeout(heartbeat);
-
- //send connection.open
- string capabilities;
- string vhost = virtualhost;
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate(method_bodies.connection_open_ok)){
- //ok
- }else if(responses.validate(method_bodies.connection_redirect)){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost() << std::endl;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
- }
- closed = false;
-}
-
-void Connection::close(){
- if(!closed){
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
-
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
- connector->close();
- closed = true;
- }
-}
-
-void Connection::openChannel(Channel* channel){
- channel->con = this;
- channel->id = ++channelIdCounter;
- channel->out = out;
- channels[channel->id] = channel;
- //now send frame to open channel and wait for response
- string oob;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
- channel->setQos();
- channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
- //send frame to close channel
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
- closeChannel(channel, code, text, classId, methodId);
-}
-
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
- //send frame to close channel
- channel->cancelAll();
- channel->closed = true;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
- channel->con = 0;
- channel->out = 0;
- removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
- //send frame to close channel
-
- channels.erase(channel->id);
- channel->out = 0;
- channel->id = 0;
- channel->con = 0;
-}
-
-void Connection::received(AMQFrame* frame){
- u_int16_t channelId = frame->getChannel();
-
- if(channelId == 0){
- this->handleBody(frame->getBody());
- }else{
- Channel* channel = channels[channelId];
- if(channel == 0){
- error(504, "Unknown channel");
- }else{
- try{
- channel->handleBody(frame->getBody());
- }catch(qpid::QpidError e){
- channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
- }
- }
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
- //connection.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(method_bodies.connection_close.match(body.get())){
- //send back close ok
- //close socket
- ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
- std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
- connector->close();
- }else{
- std::cout << "Unhandled method for connection: " << *body << std::endl;
- error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
- }
-}
-
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
- error(504, "Channel error: received header body with channel 0.");
-}
-
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
- error(504, "Channel error: received content body with channel 0.");
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
-
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
-}
-
-void Connection::error(int code, const string& msg, int classid, int methodid){
- std::cout << "Connection exception generated: " << code << msg;
- if(classid || methodid){
- std::cout << " [" << methodid << ":" << classid << "]";
- }
- std::cout << std::endl;
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
- connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
- int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0){
- closeChannel(channel, code, msg);
- }else{
- closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void Connection::idleIn(){
- std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
- connector->close();
-}
-
-void Connection::idleOut(){
- out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
-}
-
-void Connection::shutdown(){
- closed = true;
- //close all channels
- for(iterator i = channels.begin(); i != channels.end(); i++){
- i->second->stop();
- }
-}
diff --git a/Final/cpp/lib/client/Connection.h b/Final/cpp/lib/client/Connection.h
deleted file mode 100644
index 2222250188..0000000000
--- a/Final/cpp/lib/client/Connection.h
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <map>
-#include <string>
-
-#ifndef _Connection_
-#define _Connection_
-
-#include <QpidError.h>
-#include <Connector.h>
-#include <sys/ShutdownHandler.h>
-#include <sys/TimeoutHandler.h>
-
-#include <framing/amqp_framing.h>
-#include <ClientExchange.h>
-#include <IncomingMessage.h>
-#include <ClientMessage.h>
-#include <MessageListener.h>
-#include <ClientQueue.h>
-#include <ResponseHandler.h>
-#include <AMQP_HighestVersion.h>
-
-namespace qpid {
-
- /**
- * The client namespace contains all classes that make up a client
- * implementation of the AMQP protocol. The key classes that form
- * the basis of the client API to be used by applications are
- * Connection and Channel.
- */
-namespace client {
-
- class Channel;
-
- /**
- * \defgroup clientapi Application API for an AMQP client
- */
-
- /**
- * Represents a connection to an AMQP broker. All communication is
- * initiated by establishing a connection, then opening one or
- * more Channels over that connection.
- *
- * \ingroup clientapi
- */
- class Connection : public virtual qpid::framing::InputHandler,
- public virtual qpid::sys::TimeoutHandler,
- public virtual qpid::sys::ShutdownHandler,
- private virtual qpid::framing::BodyHandler{
-
- typedef std::map<int, Channel*>::iterator iterator;
-
- const bool debug;
- u_int16_t channelIdCounter;
-
- std::string host;
- int port;
- const u_int32_t max_frame_size;
- std::map<int, Channel*> channels;
- Connector* connector;
- qpid::framing::OutputHandler* out;
- ResponseHandler responses;
- volatile bool closed;
- qpid::framing::ProtocolVersion version;
- bool tcpNoDelay;
-
- void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
- void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
- void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
-
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
- public:
- /**
- * Creates a connection object, but does not open the
- * connection.
- *
- * @param _version the version of the protocol to connect with
- *
- * @param debug turns on tracing for the connection
- * (i.e. prints details of the frames sent and received to std
- * out). Optional and defaults to false.
- *
- * @param max_frame_size the maximum frame size that the
- * client will accept. Optional and defaults to 65536.
- */
- Connection( bool debug = false, u_int32_t max_frame_size = 65536,
- qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion));
- ~Connection();
-
- void setTcpNoDelay(bool on);
-
- /**
- * Opens a connection to a broker.
- *
- * @param host the host on which the broker is running
- *
- * @param port the port on the which the broker is listening
- *
- * @param uid the userid to connect with
- *
- * @param pwd the password to connect with (currently SASL
- * PLAIN is the only authentication method supported so this
- * is sent in clear text)
- *
- * @param virtualhost the AMQP virtual host to use (virtual
- * hosts, where implemented(!), provide namespace partitioning
- * within a single broker).
- */
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest", const std::string& pwd = "guest",
- const std::string& virtualhost = "");
- /**
- * Closes the connection. Any further use of this connection
- * (without reopening it) will not succeed.
- */
- void close();
- /**
- * Opens a Channel. In AMQP channels are like multi-plexed
- * 'sessions' of work over a connection. Almost all the
- * interaction with AMQP is done over a channel.
- *
- * @param channel a pointer to a channel instance that will be
- * used to represent the new channel.
- */
- void openChannel(Channel* channel);
- /*
- * Requests that the server close this channel, then removes
- * the association to the channel from this connection
- *
- * @param channel a pointer to the channel instance to close
- */
- void closeChannel(Channel* channel);
- /*
- * Removes the channel from association with this connection,
- * without sending a close request to the server.
- *
- * @param channel a pointer to the channel instance to
- * disassociate
- */
- void removeChannel(Channel* channel);
-
- virtual void received(qpid::framing::AMQFrame* frame);
-
- virtual void idleOut();
- virtual void idleIn();
-
- virtual void shutdown();
-
- /**
- * @return the maximum frame size in use on this connection
- */
- inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/Connector.cpp b/Final/cpp/lib/client/Connector.cpp
deleted file mode 100644
index a99360b840..0000000000
--- a/Final/cpp/lib/client/Connector.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <QpidError.h>
-#include <sys/Time.h>
-#include "Connector.h"
-
-using namespace qpid::sys;
-using namespace qpid::client;
-using namespace qpid::framing;
-using qpid::QpidError;
-
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
- debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- version(pVersion),
- closed(true),
- lastIn(0), lastOut(0),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size){ }
-
-Connector::~Connector(){ }
-
-void Connector::connect(const std::string& host, int port, bool tcpNoDelay){
- socket = Socket::createTcp();
- if (tcpNoDelay) {
- socket.setTcpNoDelay(true);
- }
- socket.connect(host, port);
- closed = false;
- receiver = Thread(this);
-}
-
-void Connector::init(ProtocolInitiation* header){
- writeBlock(header);
- delete header;
-}
-
-void Connector::close(){
- if (markClosed()) {
- socket.close();
- receiver.join();
- }
-}
-
-void Connector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void Connector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* Connector::getOutputHandler(){
- return this;
-}
-
-void Connector::send(AMQFrame* frame){
- writeBlock(frame);
- if(debug) std::cout << "SENT: " << *frame << std::endl;
- delete frame;
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
- Mutex::ScopedLock l(writeLock);
- data->encode(outbuf);
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
-}
-
-void Connector::writeToSocket(char* data, size_t available){
- size_t written = 0;
- while(written < available && !closed){
- ssize_t sent = socket.send(data + written, available-written);
- if(sent > 0) {
- lastOut = now() * TIME_MSEC;
- written += sent;
- }
- }
-}
-
-void Connector::handleClosed(){
- if (markClosed()) {
- socket.close();
- if(shutdownHandler) shutdownHandler->shutdown();
- }
-}
-
-bool Connector::markClosed(){
- if (closed) {
- return false;
- } else {
- closed = true;
- return true;
- }
-}
-
-void Connector::checkIdle(ssize_t status){
- if(timeoutHandler){
- Time t = now() * TIME_MSEC;
- if(status == Socket::SOCKET_TIMEOUT) {
- if(idleIn && (t - lastIn > idleIn)){
- timeoutHandler->idleIn();
- }
- }else if(status == Socket::SOCKET_EOF){
- handleClosed();
- }else{
- lastIn = t;
- }
- if(idleOut && (t - lastOut > idleOut)){
- timeoutHandler->idleOut();
- }
- }
-}
-
-void Connector::setReadTimeout(u_int16_t t){
- idleIn = t * 1000;//t is in secs
- if(idleIn && (!timeout || idleIn < timeout)){
- timeout = idleIn;
- setSocketTimeout();
- }
-
-}
-
-void Connector::setWriteTimeout(u_int16_t t){
- idleOut = t * 1000;//t is in secs
- if(idleOut && (!timeout || idleOut < timeout)){
- timeout = idleOut;
- setSocketTimeout();
- }
-}
-
-void Connector::setSocketTimeout(){
- socket.setTimeout(timeout*TIME_MSEC);
-}
-
-void Connector::setTimeoutHandler(TimeoutHandler* handler){
- timeoutHandler = handler;
-}
-
-void Connector::run(){
- try{
- while(!closed){
- ssize_t available = inbuf.available();
- if(available < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- ssize_t received = socket.recv(inbuf.start(), available);
- checkIdle(received);
-
- if(!closed && received > 0){
- inbuf.move(received);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame(version);
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: " << frame << std::endl;
- input->received(&frame);
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg
- << " (" << error.location.file << ":" << error.location.line
- << ")" << std::endl;
- handleClosed();
- }
-}
diff --git a/Final/cpp/lib/client/Connector.h b/Final/cpp/lib/client/Connector.h
deleted file mode 100644
index 44112369dc..0000000000
--- a/Final/cpp/lib/client/Connector.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Connector_
-#define _Connector_
-
-
-#include <framing/InputHandler.h>
-#include <framing/OutputHandler.h>
-#include <framing/InitiationHandler.h>
-#include <framing/ProtocolInitiation.h>
-#include <ProtocolVersion.h>
-#include <sys/ShutdownHandler.h>
-#include <sys/TimeoutHandler.h>
-#include <sys/Thread.h>
-#include <sys/Monitor.h>
-#include <sys/Socket.h>
-
-namespace qpid {
-namespace client {
-
- class Connector : public qpid::framing::OutputHandler,
- private qpid::sys::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
- qpid::framing::ProtocolVersion version;
-
- volatile bool closed;
-
- int64_t lastIn;
- int64_t lastOut;
- int64_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- qpid::sys::TimeoutHandler* timeoutHandler;
- qpid::sys::ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
-
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
-
- qpid::sys::Mutex writeLock;
- qpid::sys::Thread receiver;
-
- qpid::sys::Socket socket;
-
- void checkIdle(ssize_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
- void handleClosed();
- bool markClosed();
-
- public:
- Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port, bool tcpNoDelay=false);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
- virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/IncomingMessage.cpp b/Final/cpp/lib/client/IncomingMessage.cpp
deleted file mode 100644
index 2ff143ba94..0000000000
--- a/Final/cpp/lib/client/IncomingMessage.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <IncomingMessage.h>
-#include <QpidError.h>
-#include <iostream>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
-IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
-IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
-
-IncomingMessage::~IncomingMessage(){
-}
-
-void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){
- this->content.push_back(_content);
-}
-
-bool IncomingMessage::isComplete(){
- return header != 0 && header->getContentSize() == contentSize();
-}
-
-bool IncomingMessage::isReturn(){
- return returned;
-}
-
-bool IncomingMessage::isDelivery(){
- return delivered;
-}
-
-bool IncomingMessage::isResponse(){
- return response;
-}
-
-const string& IncomingMessage::getConsumerTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
- return delivered->getConsumerTag();
-}
-
-u_int64_t IncomingMessage::getDeliveryTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
- return delivered->getDeliveryTag();
-}
-
-AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
- return header;
-}
-
-void IncomingMessage::getData(string& s){
- int count(content.size());
- for(int i = 0; i < count; i++){
- if(i == 0) s = content[i]->getData();
- else s += content[i]->getData();
- }
-}
-
-u_int64_t IncomingMessage::contentSize(){
- u_int64_t size(0);
- u_int64_t count(content.size());
- for(u_int64_t i = 0; i < count; i++){
- size += content[i]->size();
- }
- return size;
-}
diff --git a/Final/cpp/lib/client/IncomingMessage.h b/Final/cpp/lib/client/IncomingMessage.h
deleted file mode 100644
index 464e05d877..0000000000
--- a/Final/cpp/lib/client/IncomingMessage.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <vector>
-#include <framing/amqp_framing.h>
-
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
-
-#include <ClientMessage.h>
-
-namespace qpid {
-namespace client {
-
- class IncomingMessage{
- //content will be preceded by one of these method frames
- qpid::framing::BasicDeliverBody::shared_ptr delivered;
- qpid::framing::BasicReturnBody::shared_ptr returned;
- qpid::framing::BasicGetOkBody::shared_ptr response;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
-
- u_int64_t contentSize();
- public:
- IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
- ~IncomingMessage();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr content);
- bool isComplete();
- bool isReturn();
- bool isDelivery();
- bool isResponse();
- const std::string& getConsumerTag();//only relevant if isDelivery()
- qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
- u_int64_t getDeliveryTag();
- void getData(std::string& data);
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/Makefile.am b/Final/cpp/lib/client/Makefile.am
deleted file mode 100644
index 9935da654a..0000000000
--- a/Final/cpp/lib/client/Makefile.am
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-AM_CXXFLAGS = $(WARNING_CFLAGS)
-INCLUDES = \
- -I$(top_srcdir)/gen \
- -I$(top_srcdir)/lib/common \
- -I$(top_srcdir)/lib/common/sys \
- -I$(top_srcdir)/lib/common/framing \
- $(APR_CXXFLAGS)
-
-lib_LTLIBRARIES = libqpidclient.la
-libqpidclient_la_LIBADD = ../common/libqpidcommon.la
-libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
-libqpidclient_la_SOURCES = \
- ClientChannel.cpp \
- ClientExchange.cpp \
- ClientMessage.cpp \
- ClientQueue.cpp \
- Connection.cpp \
- Connector.cpp \
- IncomingMessage.cpp \
- MessageListener.cpp \
- ResponseHandler.cpp \
- ReturnedMessageHandler.cpp
-pkginclude_HEADERS = \
- ClientChannel.h \
- ClientExchange.h \
- ClientMessage.h \
- ClientQueue.h \
- Connection.h \
- Connector.h \
- IncomingMessage.h \
- MessageListener.h \
- MethodBodyInstances.h \
- ResponseHandler.h \
- ReturnedMessageHandler.h
diff --git a/Final/cpp/lib/client/MessageListener.cpp b/Final/cpp/lib/client/MessageListener.cpp
deleted file mode 100644
index 70d44e7040..0000000000
--- a/Final/cpp/lib/client/MessageListener.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <MessageListener.h>
-
-qpid::client::MessageListener::~MessageListener() {}
diff --git a/Final/cpp/lib/client/MessageListener.h b/Final/cpp/lib/client/MessageListener.h
deleted file mode 100644
index cfb917b4f8..0000000000
--- a/Final/cpp/lib/client/MessageListener.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _MessageListener_
-#define _MessageListener_
-
-#include <ClientMessage.h>
-
-namespace qpid {
-namespace client {
-
- /**
- * An interface through which asynchronously delivered messages
- * can be received by an application.
- *
- * @see Channel::consume()
- *
- * \ingroup clientapi
- */
- class MessageListener{
- public:
- virtual ~MessageListener();
- virtual void received(Message& msg) = 0;
- };
-
-}
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/MethodBodyInstances.h b/Final/cpp/lib/client/MethodBodyInstances.h
deleted file mode 100644
index 3ab0c9af8f..0000000000
--- a/Final/cpp/lib/client/MethodBodyInstances.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <framing/amqp_framing.h>
-
-#ifndef _MethodBodyInstances_h_
-#define _MethodBodyInstances_h_
-
-namespace qpid {
-namespace client {
-
-/**
- * A list of method body instances that can be used to compare against
- * incoming bodies.
- */
-class MethodBodyInstances
-{
-private:
- qpid::framing::ProtocolVersion version;
-public:
- const qpid::framing::BasicCancelOkBody basic_cancel_ok;
- const qpid::framing::BasicConsumeOkBody basic_consume_ok;
- const qpid::framing::BasicDeliverBody basic_deliver;
- const qpid::framing::BasicGetEmptyBody basic_get_empty;
- const qpid::framing::BasicGetOkBody basic_get_ok;
- const qpid::framing::BasicQosOkBody basic_qos_ok;
- const qpid::framing::BasicReturnBody basic_return;
- const qpid::framing::ChannelCloseBody channel_close;
- const qpid::framing::ChannelCloseOkBody channel_close_ok;
- const qpid::framing::ChannelFlowBody channel_flow;
- const qpid::framing::ChannelOpenOkBody channel_open_ok;
- const qpid::framing::ConnectionCloseBody connection_close;
- const qpid::framing::ConnectionCloseOkBody connection_close_ok;
- const qpid::framing::ConnectionOpenOkBody connection_open_ok;
- const qpid::framing::ConnectionRedirectBody connection_redirect;
- const qpid::framing::ConnectionStartBody connection_start;
- const qpid::framing::ConnectionTuneBody connection_tune;
- const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
- const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
- const qpid::framing::QueueDeclareOkBody queue_declare_ok;
- const qpid::framing::QueueDeleteOkBody queue_delete_ok;
- const qpid::framing::QueueBindOkBody queue_bind_ok;
- const qpid::framing::TxCommitOkBody tx_commit_ok;
- const qpid::framing::TxRollbackOkBody tx_rollback_ok;
- const qpid::framing::TxSelectOkBody tx_select_ok;
-
- MethodBodyInstances(u_int8_t major, u_int8_t minor) :
- version(major, minor),
- basic_cancel_ok(version),
- basic_consume_ok(version),
- basic_deliver(version),
- basic_get_empty(version),
- basic_get_ok(version),
- basic_qos_ok(version),
- basic_return(version),
- channel_close(version),
- channel_close_ok(version),
- channel_flow(version),
- channel_open_ok(version),
- connection_close(version),
- connection_close_ok(version),
- connection_open_ok(version),
- connection_redirect(version),
- connection_start(version),
- connection_tune(version),
- exchange_declare_ok(version),
- exchange_delete_ok(version),
- queue_declare_ok(version),
- queue_delete_ok(version),
- queue_bind_ok(version),
- tx_commit_ok(version),
- tx_rollback_ok(version),
- tx_select_ok(version)
- {}
-
-};
-
-static MethodBodyInstances method_bodies(8, 0);
-
-} // namespace client
-} // namespace qpid
-
-#endif
diff --git a/Final/cpp/lib/client/ResponseHandler.cpp b/Final/cpp/lib/client/ResponseHandler.cpp
deleted file mode 100644
index ac8b4a9ced..0000000000
--- a/Final/cpp/lib/client/ResponseHandler.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ResponseHandler.h>
-#include <sys/Monitor.h>
-#include <QpidError.h>
-
-using namespace qpid::sys;
-
-qpid::client::ResponseHandler::ResponseHandler() : waiting(false){}
-
-qpid::client::ResponseHandler::~ResponseHandler(){}
-
-bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
- return expected.match(response.get());
-}
-
-void qpid::client::ResponseHandler::waitForResponse(){
- Monitor::ScopedLock l(monitor);
- if(waiting){
- monitor.wait();
- }
-}
-
-void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
- response = _response;
- Monitor::ScopedLock l(monitor);
- waiting = false;
- monitor.notify();
-}
-
-void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
- Monitor::ScopedLock l(monitor);
- if(waiting){
- monitor.wait();
- }
- if(!validate(expected)){
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
- }
-}
-
-void qpid::client::ResponseHandler::expect(){
- waiting = true;
-}
diff --git a/Final/cpp/lib/client/ResponseHandler.h b/Final/cpp/lib/client/ResponseHandler.h
deleted file mode 100644
index c3d499d046..0000000000
--- a/Final/cpp/lib/client/ResponseHandler.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <framing/amqp_framing.h>
-#include <sys/Monitor.h>
-
-#ifndef _ResponseHandler_
-#define _ResponseHandler_
-
-namespace qpid {
- namespace client {
-
- class ResponseHandler{
- bool waiting;
- qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::sys::Monitor monitor;
-
- public:
- ResponseHandler();
- ~ResponseHandler();
- inline bool isWaiting(){ return waiting; }
- inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
- bool validate(const qpid::framing::AMQMethodBody& expected);
- void waitForResponse();
- void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
- void receive(const qpid::framing::AMQMethodBody& expected);
- void expect();//must be called before calling receive
- };
-
- }
-}
-
-
-#endif
diff --git a/Final/cpp/lib/client/ReturnedMessageHandler.cpp b/Final/cpp/lib/client/ReturnedMessageHandler.cpp
deleted file mode 100644
index ee9f7462ef..0000000000
--- a/Final/cpp/lib/client/ReturnedMessageHandler.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <ReturnedMessageHandler.h>
-
-qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {}
diff --git a/Final/cpp/lib/client/ReturnedMessageHandler.h b/Final/cpp/lib/client/ReturnedMessageHandler.h
deleted file mode 100644
index 137f0b2e17..0000000000
--- a/Final/cpp/lib/client/ReturnedMessageHandler.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _ReturnedMessageHandler_
-#define _ReturnedMessageHandler_
-
-#include <ClientMessage.h>
-
-namespace qpid {
-namespace client {
-
- /**
- * An interface through which returned messages can be received by
- * an application.
- *
- * @see Channel::setReturnedMessageHandler()
- *
- * \ingroup clientapi
- */
- class ReturnedMessageHandler{
- public:
- virtual ~ReturnedMessageHandler();
- virtual void returned(Message& msg) = 0;
- };
-
-}
-}
-
-
-#endif