summaryrefslogtreecommitdiff
path: root/Final/cpp/lib/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-11-20 14:01:22 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-11-20 14:01:22 +0000
commite162ae8c89bfc29293ed84d552dbb373dd1c06f3 (patch)
tree4d0737f7a4324dfd93c4b519cce1e6998e46cf77 /Final/cpp/lib/client
parentbcd011a10c0db4ffc6f78380c548d673e270e000 (diff)
downloadqpid-python-e162ae8c89bfc29293ed84d552dbb373dd1c06f3.tar.gz
Creating tag for M2 final release
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@596674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'Final/cpp/lib/client')
-rw-r--r--Final/cpp/lib/client/ClientChannel.cpp434
-rw-r--r--Final/cpp/lib/client/ClientChannel.h321
-rw-r--r--Final/cpp/lib/client/ClientExchange.cpp34
-rw-r--r--Final/cpp/lib/client/ClientExchange.h106
-rw-r--r--Final/cpp/lib/client/ClientMessage.cpp150
-rw-r--r--Final/cpp/lib/client/ClientMessage.h114
-rw-r--r--Final/cpp/lib/client/ClientQueue.cpp58
-rw-r--r--Final/cpp/lib/client/ClientQueue.h104
-rw-r--r--Final/cpp/lib/client/Connection.cpp250
-rw-r--r--Final/cpp/lib/client/Connection.h182
-rw-r--r--Final/cpp/lib/client/Connector.cpp195
-rw-r--r--Final/cpp/lib/client/Connector.h97
-rw-r--r--Final/cpp/lib/client/IncomingMessage.cpp88
-rw-r--r--Final/cpp/lib/client/IncomingMessage.h63
-rw-r--r--Final/cpp/lib/client/Makefile.am52
-rw-r--r--Final/cpp/lib/client/MessageListener.cpp24
-rw-r--r--Final/cpp/lib/client/MessageListener.h49
-rw-r--r--Final/cpp/lib/client/MethodBodyInstances.h100
-rw-r--r--Final/cpp/lib/client/ResponseHandler.cpp61
-rw-r--r--Final/cpp/lib/client/ResponseHandler.h52
-rw-r--r--Final/cpp/lib/client/ReturnedMessageHandler.cpp24
-rw-r--r--Final/cpp/lib/client/ReturnedMessageHandler.h49
22 files changed, 2607 insertions, 0 deletions
diff --git a/Final/cpp/lib/client/ClientChannel.cpp b/Final/cpp/lib/client/ClientChannel.cpp
new file mode 100644
index 0000000000..a97d79dcf9
--- /dev/null
+++ b/Final/cpp/lib/client/ClientChannel.cpp
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientChannel.h>
+#include <sys/Monitor.h>
+#include <ClientMessage.h>
+#include <QpidError.h>
+#include <MethodBodyInstances.h>
+
+using namespace boost; //to use dynamic_pointer_cast
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
+ id(0),
+ con(0),
+ out(0),
+ incoming(0),
+ closed(true),
+ prefetch(_prefetch),
+ transactional(_transactional),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
+{ }
+
+Channel::~Channel(){
+ stop();
+}
+
+void Channel::setPrefetch(u_int16_t _prefetch){
+ prefetch = _prefetch;
+ if(con != 0 && out != 0){
+ setQos();
+ }
+}
+
+void Channel::setQos(){
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ if(transactional){
+ sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ }
+}
+
+void Channel::declareExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ string type = exchange.getType();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
+ if(synch){
+ sendAndReceive(frame, method_bodies.exchange_declare_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.exchange_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::declareQueue(Queue& queue, bool synch){
+ string name = queue.getName();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(),
+ queue.isAutoDelete(), !synch, args));
+ if(synch){
+ sendAndReceive(frame, method_bodies.queue_declare_ok);
+ if(queue.getName().length() == 0){
+ QueueDeclareOkBody::shared_ptr response =
+ dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ queue.setName(response->getQueue());
+ }
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
+ //ticket, queue, ifunused, ifempty, nowait
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.queue_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
+ string e = exchange.getName();
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
+ if(synch){
+ sendAndReceive(frame, method_bodies.queue_bind_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+ string q = queue.getName();
+ AMQFrame* frame =
+ new AMQFrame(version,
+ id,
+ new BasicConsumeBody(
+ version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if(synch){
+ sendAndReceive(frame, method_bodies.basic_consume_ok);
+ BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ tag = response->getConsumerTag();
+ }else{
+ out->send(frame);
+ }
+ Consumer* c = new Consumer();
+ c->listener = listener;
+ c->ackMode = ackMode;
+ c->lastDeliveryTag = 0;
+ consumers[tag] = c;
+}
+
+void Channel::cancel(std::string& tag, bool synch){
+ Consumer* c = consumers[tag];
+ if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ }
+
+ AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.basic_cancel_ok);
+ }else{
+ out->send(frame);
+ }
+ consumers.erase(tag);
+ if(c != 0){
+ delete c;
+ }
+}
+
+void Channel::cancelAll(){
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
+ Consumer* c = i->second;
+ if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ }
+ consumers.erase(i);
+ delete c;
+ }
+}
+
+void Channel::retrieve(Message& msg){
+ Monitor::ScopedLock l(retrievalMonitor);
+ while(retrieved == 0){
+ retrievalMonitor.wait();
+ }
+
+ msg.header = retrieved->getHeader();
+ msg.deliveryTag = retrieved->getDeliveryTag();
+ retrieved->getData(msg.data);
+ delete retrieved;
+ retrieved = 0;
+}
+
+bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
+ responses.expect();
+ out->send(frame);
+ responses.waitForResponse();
+ AMQMethodBody::shared_ptr response = responses.getResponse();
+ if(method_bodies.basic_get_ok.match(response.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
+ }
+ retrieve(msg);
+ return true;
+ }if(method_bodies.basic_get_empty.match(response.get())){
+ return false;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ }
+}
+
+
+void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+ string e = exchange.getName();
+ string key = routingKey;
+
+ out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ //break msg up into header frame and content frame(s) and send these
+ string data = msg.getData();
+ msg.header->setContentSize(data.length());
+ AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
+ out->send(new AMQFrame(version, id, body));
+
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+ if(data_length < frag_size){
+ out->send(new AMQFrame(version, id, new AMQContentBody(data)));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void Channel::commit(){
+ AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
+ sendAndReceive(frame, method_bodies.tx_commit_ok);
+}
+
+void Channel::rollback(){
+ AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
+ sendAndReceive(frame, method_bodies.tx_rollback_ok);
+}
+
+void Channel::handleMethod(AMQMethodBody::shared_ptr body){
+ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+ if(method_bodies.basic_deliver.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
+ }
+ }else if(method_bodies.basic_return.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
+ }
+ }else if(method_bodies.channel_close.match(body.get())){
+ con->removeChannel(this);
+ //need to signal application that channel has been closed through exception
+
+ }else if(method_bodies.channel_flow.match(body.get())){
+
+ } else if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else{
+ //signal error
+ std::cout << "Unhandled method: " << *body << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ }
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
+ }else{
+ incoming->setHeader(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
+ }else{
+ incoming->addContent(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+}
+
+void Channel::start(){
+ dispatcher = Thread(this);
+}
+
+void Channel::stop(){
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ closed = true;
+ dispatchMonitor.notify();
+ }
+ dispatcher.join();
+}
+
+void Channel::run(){
+ dispatch();
+}
+
+void Channel::enqueue(){
+ if(incoming->isResponse()){
+ Monitor::ScopedLock l(retrievalMonitor);
+ retrieved = incoming;
+ retrievalMonitor.notify();
+ }else{
+ Monitor::ScopedLock l(dispatchMonitor);
+ messages.push(incoming);
+ dispatchMonitor.notify();
+ }
+ incoming = 0;
+}
+
+IncomingMessage* Channel::dequeue(){
+ Monitor::ScopedLock l(dispatchMonitor);
+ while(messages.empty() && !closed){
+ dispatchMonitor.wait();
+ }
+ IncomingMessage* msg = 0;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ return msg;
+}
+
+void Channel::deliver(Consumer* consumer, Message& msg){
+ //record delivery tag:
+ consumer->lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer->listener->received(msg);
+
+ //if the handler calls close on the channel or connection while
+ //handling this message, then consumer will now have been deleted.
+ if(!closed){
+ bool multiple(false);
+ switch(consumer->ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer->count) < prefetch) break;
+ //else drop-through
+ case AUTO_ACK:
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
+ consumer->lastDeliveryTag = 0;
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+void Channel::dispatch(){
+ while(!closed){
+ IncomingMessage* incomingMsg = dequeue();
+ if(incomingMsg){
+ //Note: msg is currently only valid for duration of this call
+ Message msg(incomingMsg->getHeader());
+ incomingMsg->getData(msg.data);
+ if(incomingMsg->isReturn()){
+ if(returnsHandler == 0){
+ //print warning to log/console
+ std::cout << "Message returned: " << msg.getData() << std::endl;
+ }else{
+ returnsHandler->returned(msg);
+ }
+ }else{
+ msg.deliveryTag = incomingMsg->getDeliveryTag();
+ std::string tag = incomingMsg->getConsumerTag();
+
+ if(consumers[tag] == 0){
+ //signal error
+ std::cout << "Unknown consumer: " << tag << std::endl;
+ }else{
+ deliver(consumers[tag], msg);
+ }
+ }
+ delete incomingMsg;
+ }
+ }
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ returnsHandler = handler;
+}
+
+void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Channel::close(){
+ if(con != 0){
+ con->closeChannel(this);
+ }
+}
diff --git a/Final/cpp/lib/client/ClientChannel.h b/Final/cpp/lib/client/ClientChannel.h
new file mode 100644
index 0000000000..066f837430
--- /dev/null
+++ b/Final/cpp/lib/client/ClientChannel.h
@@ -0,0 +1,321 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <queue>
+#include "sys/types.h"
+
+#ifndef _Channel_
+#define _Channel_
+
+#include <framing/amqp_framing.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <IncomingMessage.h>
+#include <ClientMessage.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <ResponseHandler.h>
+#include <ReturnedMessageHandler.h>
+
+namespace qpid {
+namespace client {
+ /**
+ * The available acknowledgements modes
+ *
+ * \ingroup clientapi
+ */
+ enum ack_modes {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+ };
+
+ /**
+ * Represents an AMQP channel, i.e. loosely a session of work. It
+ * is through a channel that most of the AMQP 'methods' are
+ * exposed.
+ *
+ * \ingroup clientapi
+ */
+ class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{
+ struct Consumer{
+ MessageListener* listener;
+ int ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<std::string,Consumer*>::iterator consumer_iterator;
+
+ u_int16_t id;
+ Connection* con;
+ qpid::sys::Thread dispatcher;
+ qpid::framing::OutputHandler* out;
+ IncomingMessage* incoming;
+ ResponseHandler responses;
+ std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+ IncomingMessage* retrieved;//holds response to basic.get
+ qpid::sys::Monitor dispatchMonitor;
+ qpid::sys::Monitor retrievalMonitor;
+ std::map<std::string, Consumer*> consumers;
+ ReturnedMessageHandler* returnsHandler;
+ bool closed;
+
+ u_int16_t prefetch;
+ const bool transactional;
+ qpid::framing::ProtocolVersion version;
+
+ void enqueue();
+ void retrieve(Message& msg);
+ IncomingMessage* dequeue();
+ void dispatch();
+ void stop();
+ void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+ void deliver(Consumer* consumer, Message& msg);
+ void setQos();
+ void cancelAll();
+
+ virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ public:
+ /**
+ * Creates a channel object.
+ *
+ * @param transactional if true, the publishing and acknowledgement
+ * of messages will be transactional and can be committed or
+ * aborted in atomic units (@see commit(), @see rollback())
+ *
+ * @param prefetch specifies the number of unacknowledged
+ * messages the channel is willing to have sent to it
+ * asynchronously
+ */
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
+ ~Channel();
+
+ /**
+ * Declares an exchange.
+ *
+ * In AMQP Exchanges are the destinations to which messages
+ * are published. They have Queues bound to them and route
+ * messages they receive to those queues. The routing rules
+ * depend on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Deletes an exchange
+ *
+ * @param exchange an Exchange object representing the exchange to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Declares a Queue
+ *
+ * @param queue a Queue object representing the queue to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareQueue(Queue& queue, bool synch = true);
+ /**
+ * Deletes a Queue
+ *
+ * @param queue a Queue object representing the queue to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ /**
+ * Binds a queue to an exchange. The exact semantics of this
+ * (in particular how 'routing keys' and 'binding arguments'
+ * are used) depends on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to bind to
+ *
+ * @param queue a Queue object representing the queue to be
+ * bound
+ *
+ * @param key the 'routing key' for the binding
+ *
+ * @param args the 'binding arguments' for the binding
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
+ const qpid::framing::FieldTable& args, bool synch = true);
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see ack_modes
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const qpid::framing::FieldTable* fields = 0);
+
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * ack_modes)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ /**
+ * For a transactional channel this will commit all
+ * publications and acknowledgements since the last commit (or
+ * the channel was opened if there has been no previous
+ * commit). This will cause published messages to become
+ * available to consumers and acknowledged messages to be
+ * consumed and removed from the queues they were dispatched
+ * from.
+ *
+ * Transactionailty of a channel is specified when the channel
+ * object is created (@see Channel()).
+ */
+ void commit();
+ /**
+ * For a transactional channel, this will rollback any
+ * publications or acknowledgements. It will be as if the
+ * ppblished messages were never sent and the acknowledged
+ * messages were never consumed.
+ */
+ void rollback();
+
+ /**
+ * Change the prefetch in use.
+ */
+ void setPrefetch(u_int16_t prefetch);
+
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
+ /**
+ * Do message dispatching on this thread
+ */
+ void run();
+
+ /**
+ * Closes a channel, stopping any message dispatching.
+ */
+ void close();
+
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ friend class Connection;
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/ClientExchange.cpp b/Final/cpp/lib/client/ClientExchange.cpp
new file mode 100644
index 0000000000..5e5f3f14c6
--- /dev/null
+++ b/Final/cpp/lib/client/ClientExchange.cpp
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientExchange.h>
+
+qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
+const std::string& qpid::client::Exchange::getName() const { return name; }
+const std::string& qpid::client::Exchange::getType() const { return type; }
+
+const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
+const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
+const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
+
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/Final/cpp/lib/client/ClientExchange.h b/Final/cpp/lib/client/ClientExchange.h
new file mode 100644
index 0000000000..a8ac21fa9b
--- /dev/null
+++ b/Final/cpp/lib/client/ClientExchange.h
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Exchange_
+#define _Exchange_
+
+namespace qpid {
+namespace client {
+
+ /**
+ * A 'handle' used to represent an AMQP exchange in the Channel
+ * methods. Exchanges are the destinations to which messages are
+ * published.
+ *
+ * There are different types of exchange (the standard types are
+ * available as static constants, see DIRECT_EXCHANGE,
+ * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to
+ * an exchange using Channel::bind() and messages published to
+ * that exchange are then routed to the queue based on the details
+ * of the binding and the type of exchange.
+ *
+ * There are some standard exchange instances that are predeclared
+ * on all AMQP brokers. These are defined as static members
+ * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and
+ * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange
+ * (member DEFAULT_EXCHANGE) which is nameless and of type
+ * 'direct' and has every declared queue bound to it by queue
+ * name.
+ *
+ * \ingroup clientapi
+ */
+ class Exchange{
+ const std::string name;
+ const std::string type;
+
+ public:
+ /**
+ * A direct exchange routes messages published with routing
+ * key X to any queue bound with key X (i.e. an exact match is
+ * used).
+ */
+ static const std::string DIRECT_EXCHANGE;
+ /**
+ * A topic exchange treat the key with which a queue is bound
+ * as a pattern and routes all messages whose routing keys
+ * match that pattern to the bound queue. The routing key for
+ * a message must consist of zero or more alpha-numeric words
+ * delimited by dots. The pattern is of a similar form but *
+ * can be used to match excatly one word and # can be used to
+ * match zero or more words.
+ */
+ static const std::string TOPIC_EXCHANGE;
+ /**
+ * The headers exchange routes messages based on whether their
+ * headers match the binding arguments specified when
+ * binding. (see the AMQP spec for more details).
+ */
+ static const std::string HEADERS_EXCHANGE;
+
+ /**
+ * The 'default' exchange, nameless and of type 'direct'. Has
+ * every declared queue bound to it by name.
+ */
+ static const Exchange DEFAULT_EXCHANGE;
+ /**
+ * The standard direct exchange, named amq.direct.
+ */
+ static const Exchange STANDARD_DIRECT_EXCHANGE;
+ /**
+ * The standard topic exchange, named amq.topic.
+ */
+ static const Exchange STANDARD_TOPIC_EXCHANGE;
+ /**
+ * The standard headers exchange, named amq.header.
+ */
+ static const Exchange STANDARD_HEADERS_EXCHANGE;
+
+ Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
+ const std::string& getName() const;
+ const std::string& getType() const;
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/ClientMessage.cpp b/Final/cpp/lib/client/ClientMessage.cpp
new file mode 100644
index 0000000000..e8a2a6019e
--- /dev/null
+++ b/Final/cpp/lib/client/ClientMessage.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientMessage.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+Message::Message(){
+ header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC));
+}
+
+Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
+}
+
+Message::~Message(){
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+const std::string& Message::getContentType(){
+ return getHeaderProperties()->getContentType();
+}
+
+const std::string& Message::getContentEncoding(){
+ return getHeaderProperties()->getContentEncoding();
+}
+
+FieldTable& Message::getHeaders(){
+ return getHeaderProperties()->getHeaders();
+}
+
+u_int8_t Message::getDeliveryMode(){
+ return getHeaderProperties()->getDeliveryMode();
+}
+
+u_int8_t Message::getPriority(){
+ return getHeaderProperties()->getPriority();
+}
+
+const std::string& Message::getCorrelationId(){
+ return getHeaderProperties()->getCorrelationId();
+}
+
+const std::string& Message::getReplyTo(){
+ return getHeaderProperties()->getReplyTo();
+}
+
+const std::string& Message::getExpiration(){
+ return getHeaderProperties()->getExpiration();
+}
+
+const std::string& Message::getMessageId(){
+ return getHeaderProperties()->getMessageId();
+}
+
+u_int64_t Message::getTimestamp(){
+ return getHeaderProperties()->getTimestamp();
+}
+
+const std::string& Message::getType(){
+ return getHeaderProperties()->getType();
+}
+
+const std::string& Message::getUserId(){
+ return getHeaderProperties()->getUserId();
+}
+
+const std::string& Message::getAppId(){
+ return getHeaderProperties()->getAppId();
+}
+
+const std::string& Message::getClusterId(){
+ return getHeaderProperties()->getClusterId();
+}
+
+void Message::setContentType(const std::string& type){
+ getHeaderProperties()->setContentType(type);
+}
+
+void Message::setContentEncoding(const std::string& encoding){
+ getHeaderProperties()->setContentEncoding(encoding);
+}
+
+void Message::setHeaders(const FieldTable& headers){
+ getHeaderProperties()->setHeaders(headers);
+}
+
+void Message::setDeliveryMode(u_int8_t mode){
+ getHeaderProperties()->setDeliveryMode(mode);
+}
+
+void Message::setPriority(u_int8_t priority){
+ getHeaderProperties()->setPriority(priority);
+}
+
+void Message::setCorrelationId(const std::string& correlationId){
+ getHeaderProperties()->setCorrelationId(correlationId);
+}
+
+void Message::setReplyTo(const std::string& replyTo){
+ getHeaderProperties()->setReplyTo(replyTo);
+}
+
+void Message::setExpiration(const std::string& expiration){
+ getHeaderProperties()->setExpiration(expiration);
+}
+
+void Message::setMessageId(const std::string& messageId){
+ getHeaderProperties()->setMessageId(messageId);
+}
+
+void Message::setTimestamp(u_int64_t timestamp){
+ getHeaderProperties()->setTimestamp(timestamp);
+}
+
+void Message::setType(const std::string& type){
+ getHeaderProperties()->setType(type);
+}
+
+void Message::setUserId(const std::string& userId){
+ getHeaderProperties()->setUserId(userId);
+}
+
+void Message::setAppId(const std::string& appId){
+ getHeaderProperties()->setAppId(appId);
+}
+
+void Message::setClusterId(const std::string& clusterId){
+ getHeaderProperties()->setClusterId(clusterId);
+}
diff --git a/Final/cpp/lib/client/ClientMessage.h b/Final/cpp/lib/client/ClientMessage.h
new file mode 100644
index 0000000000..b46eb0bc72
--- /dev/null
+++ b/Final/cpp/lib/client/ClientMessage.h
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <framing/amqp_framing.h>
+
+#ifndef _Message_
+#define _Message_
+
+
+namespace qpid {
+namespace client {
+
+ /**
+ * A representation of messages for sent or recived through the
+ * client api.
+ *
+ * \ingroup clientapi
+ */
+ class Message{
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ std::string data;
+ bool redelivered;
+ u_int64_t deliveryTag;
+
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ public:
+ Message();
+ ~Message();
+
+ /**
+ * Allows the application to access the content of messages
+ * received.
+ *
+ * @return a string representing the data of the message
+ */
+ inline std::string getData(){ return data; }
+ /**
+ * Allows the application to set the content of messages to be
+ * sent.
+ *
+ * @param data a string representing the data of the message
+ */
+ inline void setData(const std::string& _data){ data = _data; }
+
+ /**
+ * @return true if this message was delivered previously (to
+ * any consumer) but was not acknowledged.
+ */
+ inline bool isRedelivered(){ return redelivered; }
+ inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+
+ inline u_int64_t getDeliveryTag(){ return deliveryTag; }
+
+ const std::string& getContentType();
+ const std::string& getContentEncoding();
+ qpid::framing::FieldTable& getHeaders();
+ u_int8_t getDeliveryMode();
+ u_int8_t getPriority();
+ const std::string& getCorrelationId();
+ const std::string& getReplyTo();
+ const std::string& getExpiration();
+ const std::string& getMessageId();
+ u_int64_t getTimestamp();
+ const std::string& getType();
+ const std::string& getUserId();
+ const std::string& getAppId();
+ const std::string& getClusterId();
+
+ void setContentType(const std::string& type);
+ void setContentEncoding(const std::string& encoding);
+ void setHeaders(const qpid::framing::FieldTable& headers);
+ /**
+ * Sets the delivery mode. 1 = non-durable, 2 = durable.
+ */
+ void setDeliveryMode(u_int8_t mode);
+ void setPriority(u_int8_t priority);
+ void setCorrelationId(const std::string& correlationId);
+ void setReplyTo(const std::string& replyTo);
+ void setExpiration(const std::string& expiration);
+ void setMessageId(const std::string& messageId);
+ void setTimestamp(u_int64_t timestamp);
+ void setType(const std::string& type);
+ void setUserId(const std::string& userId);
+ void setAppId(const std::string& appId);
+ void setClusterId(const std::string& clusterId);
+
+
+ friend class Channel;
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/ClientQueue.cpp b/Final/cpp/lib/client/ClientQueue.cpp
new file mode 100644
index 0000000000..773be504d8
--- /dev/null
+++ b/Final/cpp/lib/client/ClientQueue.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientQueue.h>
+
+qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){}
+
+qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable)
+ : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){}
+
+const std::string& qpid::client::Queue::getName() const{
+ return name;
+}
+
+void qpid::client::Queue::setName(const std::string& _name){
+ name = _name;
+}
+
+bool qpid::client::Queue::isAutoDelete() const{
+ return autodelete;
+}
+
+bool qpid::client::Queue::isExclusive() const{
+ return exclusive;
+}
+
+bool qpid::client::Queue::isDurable() const{
+ return durable;
+}
+
+void qpid::client::Queue::setDurable(bool _durable){
+ durable = _durable;
+}
+
+
+
+
diff --git a/Final/cpp/lib/client/ClientQueue.h b/Final/cpp/lib/client/ClientQueue.h
new file mode 100644
index 0000000000..4a63097c55
--- /dev/null
+++ b/Final/cpp/lib/client/ClientQueue.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Queue_
+#define _Queue_
+
+namespace qpid {
+namespace client {
+
+ /**
+ * A 'handle' used to represent an AMQP queue in the Channel
+ * methods. Creating an instance of this class does not cause the
+ * queue to be created on the broker. Rather, an instance of this
+ * class should be passed to Channel::declareQueue() to ensure
+ * that the queue exists or is created.
+ *
+ * Queues hold messages and allow clients to consume
+ * (see Channel::consume()) or get (see Channel::get()) those messags. A
+ * queue receives messages by being bound to one or more Exchange;
+ * messages published to that exchange may then be routed to the
+ * queue based on the details of the binding and the type of the
+ * exchange (see Channel::bind()).
+ *
+ * Queues are identified by a name. They can be exclusive (in which
+ * case they can only be used in the context of the connection
+ * over which they were declared, and are deleted when then
+ * connection closes), or they can be shared. Shared queues can be
+ * auto deleted when they have no consumers.
+ *
+ * We use the term 'temporary queue' to refer to an exclusive
+ * queue.
+ *
+ * \ingroup clientapi
+ */
+ class Queue{
+ std::string name;
+ const bool autodelete;
+ const bool exclusive;
+ bool durable;
+
+ public:
+
+ /**
+ * Creates an unnamed, non-durable, temporary queue. A name
+ * will be assigned to this queue instance by a call to
+ * Channel::declareQueue().
+ */
+ Queue();
+ /**
+ * Creates a shared, non-durable, queue with a given name,
+ * that will not be autodeleted.
+ *
+ * @param name the name of the queue
+ */
+ Queue(std::string name);
+ /**
+ * Creates a non-durable queue with a given name.
+ *
+ * @param name the name of the queue
+ *
+ * @param temp if true the queue will be a temporary queue, if
+ * false it will be shared and not autodeleted.
+ */
+ Queue(std::string name, bool temp);
+ /**
+ * This constructor allows the autodelete, exclusive and
+ * durable propeties to be explictly set. Note however that if
+ * exclusive is true, autodelete has no meaning as exclusive
+ * queues are always destroyed when the connection that
+ * created them is closed.
+ */
+ Queue(std::string name, bool autodelete, bool exclusive, bool durable);
+ const std::string& getName() const;
+ void setName(const std::string&);
+ bool isAutoDelete() const;
+ bool isExclusive() const;
+ bool isDurable() const;
+ void setDurable(bool durable);
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/Connection.cpp b/Final/cpp/lib/client/Connection.cpp
new file mode 100644
index 0000000000..c00b58a4a9
--- /dev/null
+++ b/Final/cpp/lib/client/Connection.cpp
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Connection.h>
+#include <ClientChannel.h>
+#include <ClientMessage.h>
+#include <QpidError.h>
+#include <iostream>
+#include <MethodBodyInstances.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::sys;
+
+Connection::Connection( bool _debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) :
+ debug(_debug),
+ channelIdCounter(0),
+ max_frame_size(_max_frame_size),
+ closed(true),
+ version(_version->getMajor(),_version->getMinor()),
+ tcpNoDelay(false)
+{
+ connector = new Connector(version, debug, _max_frame_size);
+}
+
+Connection::~Connection(){
+ delete connector;
+}
+
+void Connection::setTcpNoDelay(bool on) {
+ tcpNoDelay = on;
+}
+
+void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+ host = _host;
+ port = _port;
+ connector->setInputHandler(this);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+ out = connector->getOutputHandler();
+ connector->connect(host, port, tcpNoDelay);
+
+ ProtocolInitiation* header = new ProtocolInitiation(version);
+ responses.expect();
+ connector->init(header);
+ responses.receive(method_bodies.connection_start);
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ responses.expect();
+ out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ responses.receive(method_bodies.connection_tune);
+
+ ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
+ out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connector->setReadTimeout(heartbeat * 2);
+ connector->setWriteTimeout(heartbeat);
+
+ //send connection.open
+ string capabilities;
+ string vhost = virtualhost;
+ responses.expect();
+ out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate(method_bodies.connection_open_ok)){
+ //ok
+ }else if(responses.validate(method_bodies.connection_redirect)){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost() << std::endl;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
+ closed = false;
+}
+
+void Connection::close(){
+ if(!closed){
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+
+ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+ connector->close();
+ closed = true;
+ }
+}
+
+void Connection::openChannel(Channel* channel){
+ channel->con = this;
+ channel->id = ++channelIdCounter;
+ channel->out = out;
+ channels[channel->id] = channel;
+ //now send frame to open channel and wait for response
+ string oob;
+ channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
+ channel->setQos();
+ channel->closed = false;
+}
+
+void Connection::closeChannel(Channel* channel){
+ //send frame to close channel
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+ closeChannel(channel, code, text, classId, methodId);
+}
+
+void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
+ //send frame to close channel
+ channel->cancelAll();
+ channel->closed = true;
+ channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
+ channel->con = 0;
+ channel->out = 0;
+ removeChannel(channel);
+}
+
+void Connection::removeChannel(Channel* channel){
+ //send frame to close channel
+
+ channels.erase(channel->id);
+ channel->out = 0;
+ channel->id = 0;
+ channel->con = 0;
+}
+
+void Connection::received(AMQFrame* frame){
+ u_int16_t channelId = frame->getChannel();
+
+ if(channelId == 0){
+ this->handleBody(frame->getBody());
+ }else{
+ Channel* channel = channels[channelId];
+ if(channel == 0){
+ error(504, "Unknown channel");
+ }else{
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(qpid::QpidError e){
+ channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ }
+ }
+ }
+}
+
+void Connection::handleMethod(AMQMethodBody::shared_ptr body){
+ //connection.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(method_bodies.connection_close.match(body.get())){
+ //send back close ok
+ //close socket
+ ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
+ std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
+ connector->close();
+ }else{
+ std::cout << "Unhandled method for connection: " << *body << std::endl;
+ error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+ }
+}
+
+void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
+ error(504, "Channel error: received header body with channel 0.");
+}
+
+void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
+ error(504, "Channel error: received content body with channel 0.");
+}
+
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+}
+
+void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Connection::error(int code, const string& msg, int classid, int methodid){
+ std::cout << "Connection exception generated: " << code << msg;
+ if(classid || methodid){
+ std::cout << " [" << methodid << ":" << classid << "]";
+ }
+ std::cout << std::endl;
+ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
+ connector->close();
+}
+
+void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
+ std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
+ int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+ string msg = e.msg;
+ if(method == 0){
+ closeChannel(channel, code, msg);
+ }else{
+ closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void Connection::idleIn(){
+ std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
+ connector->close();
+}
+
+void Connection::idleOut(){
+ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
+}
+
+void Connection::shutdown(){
+ closed = true;
+ //close all channels
+ for(iterator i = channels.begin(); i != channels.end(); i++){
+ i->second->stop();
+ }
+}
diff --git a/Final/cpp/lib/client/Connection.h b/Final/cpp/lib/client/Connection.h
new file mode 100644
index 0000000000..2222250188
--- /dev/null
+++ b/Final/cpp/lib/client/Connection.h
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+
+#ifndef _Connection_
+#define _Connection_
+
+#include <QpidError.h>
+#include <Connector.h>
+#include <sys/ShutdownHandler.h>
+#include <sys/TimeoutHandler.h>
+
+#include <framing/amqp_framing.h>
+#include <ClientExchange.h>
+#include <IncomingMessage.h>
+#include <ClientMessage.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <ResponseHandler.h>
+#include <AMQP_HighestVersion.h>
+
+namespace qpid {
+
+ /**
+ * The client namespace contains all classes that make up a client
+ * implementation of the AMQP protocol. The key classes that form
+ * the basis of the client API to be used by applications are
+ * Connection and Channel.
+ */
+namespace client {
+
+ class Channel;
+
+ /**
+ * \defgroup clientapi Application API for an AMQP client
+ */
+
+ /**
+ * Represents a connection to an AMQP broker. All communication is
+ * initiated by establishing a connection, then opening one or
+ * more Channels over that connection.
+ *
+ * \ingroup clientapi
+ */
+ class Connection : public virtual qpid::framing::InputHandler,
+ public virtual qpid::sys::TimeoutHandler,
+ public virtual qpid::sys::ShutdownHandler,
+ private virtual qpid::framing::BodyHandler{
+
+ typedef std::map<int, Channel*>::iterator iterator;
+
+ const bool debug;
+ u_int16_t channelIdCounter;
+
+ std::string host;
+ int port;
+ const u_int32_t max_frame_size;
+ std::map<int, Channel*> channels;
+ Connector* connector;
+ qpid::framing::OutputHandler* out;
+ ResponseHandler responses;
+ volatile bool closed;
+ qpid::framing::ProtocolVersion version;
+ bool tcpNoDelay;
+
+ void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+ void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
+ void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
+ void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+
+ virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ public:
+ /**
+ * Creates a connection object, but does not open the
+ * connection.
+ *
+ * @param _version the version of the protocol to connect with
+ *
+ * @param debug turns on tracing for the connection
+ * (i.e. prints details of the frames sent and received to std
+ * out). Optional and defaults to false.
+ *
+ * @param max_frame_size the maximum frame size that the
+ * client will accept. Optional and defaults to 65536.
+ */
+ Connection( bool debug = false, u_int32_t max_frame_size = 65536,
+ qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion));
+ ~Connection();
+
+ void setTcpNoDelay(bool on);
+
+ /**
+ * Opens a connection to a broker.
+ *
+ * @param host the host on which the broker is running
+ *
+ * @param port the port on the which the broker is listening
+ *
+ * @param uid the userid to connect with
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text)
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
+ */
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest", const std::string& pwd = "guest",
+ const std::string& virtualhost = "");
+ /**
+ * Closes the connection. Any further use of this connection
+ * (without reopening it) will not succeed.
+ */
+ void close();
+ /**
+ * Opens a Channel. In AMQP channels are like multi-plexed
+ * 'sessions' of work over a connection. Almost all the
+ * interaction with AMQP is done over a channel.
+ *
+ * @param channel a pointer to a channel instance that will be
+ * used to represent the new channel.
+ */
+ void openChannel(Channel* channel);
+ /*
+ * Requests that the server close this channel, then removes
+ * the association to the channel from this connection
+ *
+ * @param channel a pointer to the channel instance to close
+ */
+ void closeChannel(Channel* channel);
+ /*
+ * Removes the channel from association with this connection,
+ * without sending a close request to the server.
+ *
+ * @param channel a pointer to the channel instance to
+ * disassociate
+ */
+ void removeChannel(Channel* channel);
+
+ virtual void received(qpid::framing::AMQFrame* frame);
+
+ virtual void idleOut();
+ virtual void idleIn();
+
+ virtual void shutdown();
+
+ /**
+ * @return the maximum frame size in use on this connection
+ */
+ inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/Connector.cpp b/Final/cpp/lib/client/Connector.cpp
new file mode 100644
index 0000000000..a99360b840
--- /dev/null
+++ b/Final/cpp/lib/client/Connector.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <QpidError.h>
+#include <sys/Time.h>
+#include "Connector.h"
+
+using namespace qpid::sys;
+using namespace qpid::client;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ version(pVersion),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size){ }
+
+Connector::~Connector(){ }
+
+void Connector::connect(const std::string& host, int port, bool tcpNoDelay){
+ socket = Socket::createTcp();
+ if (tcpNoDelay) {
+ socket.setTcpNoDelay(true);
+ }
+ socket.connect(host, port);
+ closed = false;
+ receiver = Thread(this);
+}
+
+void Connector::init(ProtocolInitiation* header){
+ writeBlock(header);
+ delete header;
+}
+
+void Connector::close(){
+ if (markClosed()) {
+ socket.close();
+ receiver.join();
+ }
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){
+ return this;
+}
+
+void Connector::send(AMQFrame* frame){
+ writeBlock(frame);
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+ delete frame;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+ Mutex::ScopedLock l(writeLock);
+ data->encode(outbuf);
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+ size_t written = 0;
+ while(written < available && !closed){
+ ssize_t sent = socket.send(data + written, available-written);
+ if(sent > 0) {
+ lastOut = now() * TIME_MSEC;
+ written += sent;
+ }
+ }
+}
+
+void Connector::handleClosed(){
+ if (markClosed()) {
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }
+}
+
+bool Connector::markClosed(){
+ if (closed) {
+ return false;
+ } else {
+ closed = true;
+ return true;
+ }
+}
+
+void Connector::checkIdle(ssize_t status){
+ if(timeoutHandler){
+ Time t = now() * TIME_MSEC;
+ if(status == Socket::SOCKET_TIMEOUT) {
+ if(idleIn && (t - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }else if(status == Socket::SOCKET_EOF){
+ handleClosed();
+ }else{
+ lastIn = t;
+ }
+ if(idleOut && (t - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void Connector::setReadTimeout(u_int16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void Connector::setWriteTimeout(u_int16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void Connector::setSocketTimeout(){
+ socket.setTimeout(timeout*TIME_MSEC);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void Connector::run(){
+ try{
+ while(!closed){
+ ssize_t available = inbuf.available();
+ if(available < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
+
+ if(!closed && received > 0){
+ inbuf.move(received);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame(version);
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg
+ << " (" << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
+ handleClosed();
+ }
+}
diff --git a/Final/cpp/lib/client/Connector.h b/Final/cpp/lib/client/Connector.h
new file mode 100644
index 0000000000..44112369dc
--- /dev/null
+++ b/Final/cpp/lib/client/Connector.h
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+
+#include <framing/InputHandler.h>
+#include <framing/OutputHandler.h>
+#include <framing/InitiationHandler.h>
+#include <framing/ProtocolInitiation.h>
+#include <ProtocolVersion.h>
+#include <sys/ShutdownHandler.h>
+#include <sys/TimeoutHandler.h>
+#include <sys/Thread.h>
+#include <sys/Monitor.h>
+#include <sys/Socket.h>
+
+namespace qpid {
+namespace client {
+
+ class Connector : public qpid::framing::OutputHandler,
+ private qpid::sys::Runnable
+ {
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+ qpid::framing::ProtocolVersion version;
+
+ volatile bool closed;
+
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ qpid::sys::TimeoutHandler* timeoutHandler;
+ qpid::sys::ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::sys::Mutex writeLock;
+ qpid::sys::Thread receiver;
+
+ qpid::sys::Socket socket;
+
+ void checkIdle(ssize_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+ void handleClosed();
+ bool markClosed();
+
+ public:
+ Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port, bool tcpNoDelay=false);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/IncomingMessage.cpp b/Final/cpp/lib/client/IncomingMessage.cpp
new file mode 100644
index 0000000000..2ff143ba94
--- /dev/null
+++ b/Final/cpp/lib/client/IncomingMessage.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <IncomingMessage.h>
+#include <QpidError.h>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
+IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
+IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+
+IncomingMessage::~IncomingMessage(){
+}
+
+void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
+ this->header = _header;
+}
+
+void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){
+ this->content.push_back(_content);
+}
+
+bool IncomingMessage::isComplete(){
+ return header != 0 && header->getContentSize() == contentSize();
+}
+
+bool IncomingMessage::isReturn(){
+ return returned;
+}
+
+bool IncomingMessage::isDelivery(){
+ return delivered;
+}
+
+bool IncomingMessage::isResponse(){
+ return response;
+}
+
+const string& IncomingMessage::getConsumerTag(){
+ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
+ return delivered->getConsumerTag();
+}
+
+u_int64_t IncomingMessage::getDeliveryTag(){
+ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
+ return delivered->getDeliveryTag();
+}
+
+AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
+ return header;
+}
+
+void IncomingMessage::getData(string& s){
+ int count(content.size());
+ for(int i = 0; i < count; i++){
+ if(i == 0) s = content[i]->getData();
+ else s += content[i]->getData();
+ }
+}
+
+u_int64_t IncomingMessage::contentSize(){
+ u_int64_t size(0);
+ u_int64_t count(content.size());
+ for(u_int64_t i = 0; i < count; i++){
+ size += content[i]->size();
+ }
+ return size;
+}
diff --git a/Final/cpp/lib/client/IncomingMessage.h b/Final/cpp/lib/client/IncomingMessage.h
new file mode 100644
index 0000000000..464e05d877
--- /dev/null
+++ b/Final/cpp/lib/client/IncomingMessage.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include <framing/amqp_framing.h>
+
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
+#include <ClientMessage.h>
+
+namespace qpid {
+namespace client {
+
+ class IncomingMessage{
+ //content will be preceded by one of these method frames
+ qpid::framing::BasicDeliverBody::shared_ptr delivered;
+ qpid::framing::BasicReturnBody::shared_ptr returned;
+ qpid::framing::BasicGetOkBody::shared_ptr response;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
+
+ u_int64_t contentSize();
+ public:
+ IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
+ IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
+ IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
+ ~IncomingMessage();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr content);
+ bool isComplete();
+ bool isReturn();
+ bool isDelivery();
+ bool isResponse();
+ const std::string& getConsumerTag();//only relevant if isDelivery()
+ qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
+ u_int64_t getDeliveryTag();
+ void getData(std::string& data);
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/Makefile.am b/Final/cpp/lib/client/Makefile.am
new file mode 100644
index 0000000000..9935da654a
--- /dev/null
+++ b/Final/cpp/lib/client/Makefile.am
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+AM_CXXFLAGS = $(WARNING_CFLAGS)
+INCLUDES = \
+ -I$(top_srcdir)/gen \
+ -I$(top_srcdir)/lib/common \
+ -I$(top_srcdir)/lib/common/sys \
+ -I$(top_srcdir)/lib/common/framing \
+ $(APR_CXXFLAGS)
+
+lib_LTLIBRARIES = libqpidclient.la
+libqpidclient_la_LIBADD = ../common/libqpidcommon.la
+libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
+libqpidclient_la_SOURCES = \
+ ClientChannel.cpp \
+ ClientExchange.cpp \
+ ClientMessage.cpp \
+ ClientQueue.cpp \
+ Connection.cpp \
+ Connector.cpp \
+ IncomingMessage.cpp \
+ MessageListener.cpp \
+ ResponseHandler.cpp \
+ ReturnedMessageHandler.cpp
+pkginclude_HEADERS = \
+ ClientChannel.h \
+ ClientExchange.h \
+ ClientMessage.h \
+ ClientQueue.h \
+ Connection.h \
+ Connector.h \
+ IncomingMessage.h \
+ MessageListener.h \
+ MethodBodyInstances.h \
+ ResponseHandler.h \
+ ReturnedMessageHandler.h
diff --git a/Final/cpp/lib/client/MessageListener.cpp b/Final/cpp/lib/client/MessageListener.cpp
new file mode 100644
index 0000000000..70d44e7040
--- /dev/null
+++ b/Final/cpp/lib/client/MessageListener.cpp
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <MessageListener.h>
+
+qpid::client::MessageListener::~MessageListener() {}
diff --git a/Final/cpp/lib/client/MessageListener.h b/Final/cpp/lib/client/MessageListener.h
new file mode 100644
index 0000000000..cfb917b4f8
--- /dev/null
+++ b/Final/cpp/lib/client/MessageListener.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _MessageListener_
+#define _MessageListener_
+
+#include <ClientMessage.h>
+
+namespace qpid {
+namespace client {
+
+ /**
+ * An interface through which asynchronously delivered messages
+ * can be received by an application.
+ *
+ * @see Channel::consume()
+ *
+ * \ingroup clientapi
+ */
+ class MessageListener{
+ public:
+ virtual ~MessageListener();
+ virtual void received(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/MethodBodyInstances.h b/Final/cpp/lib/client/MethodBodyInstances.h
new file mode 100644
index 0000000000..3ab0c9af8f
--- /dev/null
+++ b/Final/cpp/lib/client/MethodBodyInstances.h
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <framing/amqp_framing.h>
+
+#ifndef _MethodBodyInstances_h_
+#define _MethodBodyInstances_h_
+
+namespace qpid {
+namespace client {
+
+/**
+ * A list of method body instances that can be used to compare against
+ * incoming bodies.
+ */
+class MethodBodyInstances
+{
+private:
+ qpid::framing::ProtocolVersion version;
+public:
+ const qpid::framing::BasicCancelOkBody basic_cancel_ok;
+ const qpid::framing::BasicConsumeOkBody basic_consume_ok;
+ const qpid::framing::BasicDeliverBody basic_deliver;
+ const qpid::framing::BasicGetEmptyBody basic_get_empty;
+ const qpid::framing::BasicGetOkBody basic_get_ok;
+ const qpid::framing::BasicQosOkBody basic_qos_ok;
+ const qpid::framing::BasicReturnBody basic_return;
+ const qpid::framing::ChannelCloseBody channel_close;
+ const qpid::framing::ChannelCloseOkBody channel_close_ok;
+ const qpid::framing::ChannelFlowBody channel_flow;
+ const qpid::framing::ChannelOpenOkBody channel_open_ok;
+ const qpid::framing::ConnectionCloseBody connection_close;
+ const qpid::framing::ConnectionCloseOkBody connection_close_ok;
+ const qpid::framing::ConnectionOpenOkBody connection_open_ok;
+ const qpid::framing::ConnectionRedirectBody connection_redirect;
+ const qpid::framing::ConnectionStartBody connection_start;
+ const qpid::framing::ConnectionTuneBody connection_tune;
+ const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
+ const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
+ const qpid::framing::QueueDeclareOkBody queue_declare_ok;
+ const qpid::framing::QueueDeleteOkBody queue_delete_ok;
+ const qpid::framing::QueueBindOkBody queue_bind_ok;
+ const qpid::framing::TxCommitOkBody tx_commit_ok;
+ const qpid::framing::TxRollbackOkBody tx_rollback_ok;
+ const qpid::framing::TxSelectOkBody tx_select_ok;
+
+ MethodBodyInstances(u_int8_t major, u_int8_t minor) :
+ version(major, minor),
+ basic_cancel_ok(version),
+ basic_consume_ok(version),
+ basic_deliver(version),
+ basic_get_empty(version),
+ basic_get_ok(version),
+ basic_qos_ok(version),
+ basic_return(version),
+ channel_close(version),
+ channel_close_ok(version),
+ channel_flow(version),
+ channel_open_ok(version),
+ connection_close(version),
+ connection_close_ok(version),
+ connection_open_ok(version),
+ connection_redirect(version),
+ connection_start(version),
+ connection_tune(version),
+ exchange_declare_ok(version),
+ exchange_delete_ok(version),
+ queue_declare_ok(version),
+ queue_delete_ok(version),
+ queue_bind_ok(version),
+ tx_commit_ok(version),
+ tx_rollback_ok(version),
+ tx_select_ok(version)
+ {}
+
+};
+
+static MethodBodyInstances method_bodies(8, 0);
+
+} // namespace client
+} // namespace qpid
+
+#endif
diff --git a/Final/cpp/lib/client/ResponseHandler.cpp b/Final/cpp/lib/client/ResponseHandler.cpp
new file mode 100644
index 0000000000..ac8b4a9ced
--- /dev/null
+++ b/Final/cpp/lib/client/ResponseHandler.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ResponseHandler.h>
+#include <sys/Monitor.h>
+#include <QpidError.h>
+
+using namespace qpid::sys;
+
+qpid::client::ResponseHandler::ResponseHandler() : waiting(false){}
+
+qpid::client::ResponseHandler::~ResponseHandler(){}
+
+bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
+ return expected.match(response.get());
+}
+
+void qpid::client::ResponseHandler::waitForResponse(){
+ Monitor::ScopedLock l(monitor);
+ if(waiting){
+ monitor.wait();
+ }
+}
+
+void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
+ response = _response;
+ Monitor::ScopedLock l(monitor);
+ waiting = false;
+ monitor.notify();
+}
+
+void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
+ Monitor::ScopedLock l(monitor);
+ if(waiting){
+ monitor.wait();
+ }
+ if(!validate(expected)){
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
+ }
+}
+
+void qpid::client::ResponseHandler::expect(){
+ waiting = true;
+}
diff --git a/Final/cpp/lib/client/ResponseHandler.h b/Final/cpp/lib/client/ResponseHandler.h
new file mode 100644
index 0000000000..c3d499d046
--- /dev/null
+++ b/Final/cpp/lib/client/ResponseHandler.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <framing/amqp_framing.h>
+#include <sys/Monitor.h>
+
+#ifndef _ResponseHandler_
+#define _ResponseHandler_
+
+namespace qpid {
+ namespace client {
+
+ class ResponseHandler{
+ bool waiting;
+ qpid::framing::AMQMethodBody::shared_ptr response;
+ qpid::sys::Monitor monitor;
+
+ public:
+ ResponseHandler();
+ ~ResponseHandler();
+ inline bool isWaiting(){ return waiting; }
+ inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
+ bool validate(const qpid::framing::AMQMethodBody& expected);
+ void waitForResponse();
+ void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
+ void receive(const qpid::framing::AMQMethodBody& expected);
+ void expect();//must be called before calling receive
+ };
+
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/client/ReturnedMessageHandler.cpp b/Final/cpp/lib/client/ReturnedMessageHandler.cpp
new file mode 100644
index 0000000000..ee9f7462ef
--- /dev/null
+++ b/Final/cpp/lib/client/ReturnedMessageHandler.cpp
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <ReturnedMessageHandler.h>
+
+qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {}
diff --git a/Final/cpp/lib/client/ReturnedMessageHandler.h b/Final/cpp/lib/client/ReturnedMessageHandler.h
new file mode 100644
index 0000000000..137f0b2e17
--- /dev/null
+++ b/Final/cpp/lib/client/ReturnedMessageHandler.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _ReturnedMessageHandler_
+#define _ReturnedMessageHandler_
+
+#include <ClientMessage.h>
+
+namespace qpid {
+namespace client {
+
+ /**
+ * An interface through which returned messages can be received by
+ * an application.
+ *
+ * @see Channel::setReturnedMessageHandler()
+ *
+ * \ingroup clientapi
+ */
+ class ReturnedMessageHandler{
+ public:
+ virtual ~ReturnedMessageHandler();
+ virtual void returned(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif