summaryrefslogtreecommitdiff
path: root/cpp/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/client/src')
-rw-r--r--cpp/client/src/Channel.cpp432
-rw-r--r--cpp/client/src/Connection.cpp237
-rw-r--r--cpp/client/src/Exchange.cpp30
-rw-r--r--cpp/client/src/IncomingMessage.cpp85
-rw-r--r--cpp/client/src/Message.cpp147
-rw-r--r--cpp/client/src/Queue.cpp47
-rw-r--r--cpp/client/src/ResponseHandler.cpp63
7 files changed, 1041 insertions, 0 deletions
diff --git a/cpp/client/src/Channel.cpp b/cpp/client/src/Channel.cpp
new file mode 100644
index 0000000000..e965f7e5dd
--- /dev/null
+++ b/cpp/client/src/Channel.cpp
@@ -0,0 +1,432 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Channel.h"
+#include "MonitorImpl.h"
+#include "ThreadFactoryImpl.h"
+#include "Message.h"
+#include "QpidError.h"
+
+using namespace std::tr1;//to use dynamic_pointer_cast
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), incoming(0), con(0), out(0),
+ prefetch(_prefetch),
+ transactional(_transactional),
+ dispatcher(0),
+ closed(true){
+ threadFactory = new ThreadFactoryImpl();
+ dispatchMonitor = new MonitorImpl();
+ retrievalMonitor = new MonitorImpl();
+}
+
+Channel::~Channel(){
+ if(dispatcher){
+ stop();
+ delete dispatcher;
+ }
+ delete retrievalMonitor;
+ delete dispatchMonitor;
+ delete threadFactory;
+}
+
+void Channel::setPrefetch(u_int16_t prefetch){
+ this->prefetch = prefetch;
+ if(con != 0 && out != 0){
+ setQos();
+ }
+}
+
+void Channel::setQos(){
+ sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok);
+ if(transactional){
+ sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok);
+ }
+}
+
+void Channel::declareExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ string type = exchange.getType();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args));
+ if(synch){
+ sendAndReceive(frame, exchange_declare_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch));
+ if(synch){
+ sendAndReceive(frame, exchange_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::declareQueue(Queue& queue, bool synch){
+ string name = queue.getName();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false,
+ queue.isExclusive(),
+ queue.isAutoDelete(), !synch, args));
+ if(synch){
+ sendAndReceive(frame, queue_declare_ok);
+ if(queue.getName().length() == 0){
+ QueueDeclareOkBody::shared_ptr response =
+ dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ queue.setName(response->getQueue());
+ }
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
+ //ticket, queue, ifunused, ifempty, nowait
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch));
+ if(synch){
+ sendAndReceive(frame, queue_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
+ string e = exchange.getName();
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, (string&) key,!synch, (FieldTable&) args));
+ if(synch){
+ sendAndReceive(frame, queue_bind_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode, bool noLocal, bool synch){
+
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
+ if(synch){
+ sendAndReceive(frame, basic_consume_ok);
+ BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ tag = response->getConsumerTag();
+ }else{
+ out->send(frame);
+ }
+ Consumer* c = new Consumer();
+ c->listener = listener;
+ c->ackMode = ackMode;
+ c->lastDeliveryTag = 0;
+ consumers[tag] = c;
+}
+
+void Channel::cancel(std::string& tag, bool synch){
+ Consumer* c = consumers[tag];
+ if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ }
+
+ AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch));
+ if(synch){
+ sendAndReceive(frame, basic_cancel_ok);
+ }else{
+ out->send(frame);
+ }
+ consumers.erase(tag);
+ if(c != 0){
+ delete c;
+ }
+}
+
+void Channel::cancelAll(){
+ int count(consumers.size());
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
+ Consumer* c = i->second;
+ if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ }
+ consumers.erase(i);
+ delete c;
+ }
+}
+
+void Channel::retrieve(Message& msg){
+ retrievalMonitor->acquire();
+ while(retrieved == 0){
+ retrievalMonitor->wait();
+ }
+
+ msg.header = retrieved->getHeader();
+ msg.deliveryTag = retrieved->getDeliveryTag();
+ retrieved->getData(msg.data);
+ delete retrieved;
+ retrieved = 0;
+
+ retrievalMonitor->release();
+}
+
+bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode));
+ responses.expect();
+ out->send(frame);
+ responses.waitForResponse();
+ AMQMethodBody::shared_ptr response = responses.getResponse();
+ if(basic_get_ok.match(response.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
+ }
+ retrieve(msg);
+ return true;
+ }if(basic_get_empty.match(response.get())){
+ return false;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ }
+}
+
+
+void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+ string e = exchange.getName();
+ string key = routingKey;
+
+ out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate)));
+ //break msg up into header frame and content frame(s) and send these
+ string data = msg.getData();
+ msg.header->setContentSize(data.length());
+ AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
+ out->send(new AMQFrame(id, body));
+
+ int data_length = data.length();
+ if(data_length > 0){
+ //TODO fragmentation of messages, need to know max frame size for connection
+ int frag_size = con->getMaxFrameSize() - 4;
+ if(data_length < frag_size){
+ out->send(new AMQFrame(id, new AMQContentBody(data)));
+ }else{
+ int frag_count = data_length / frag_size;
+ for(int i = 0; i < frag_count; i++){
+ int pos = i*frag_size;
+ int len = i < frag_count - 1 ? frag_size : data_length - pos;
+ string frag(data.substr(pos, len));
+ out->send(new AMQFrame(id, new AMQContentBody(frag)));
+ }
+ }
+ }
+}
+
+void Channel::commit(){
+ AMQFrame* frame = new AMQFrame(id, new TxCommitBody());
+ sendAndReceive(frame, tx_commit_ok);
+}
+
+void Channel::rollback(){
+ AMQFrame* frame = new AMQFrame(id, new TxRollbackBody());
+ sendAndReceive(frame, tx_rollback_ok);
+}
+
+void Channel::handleMethod(AMQMethodBody::shared_ptr body){
+ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(basic_deliver.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
+ }
+ }else if(basic_return.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
+ }
+ }else if(channel_close.match(body.get())){
+ con->removeChannel(this);
+ //need to signal application that channel has been closed through exception
+
+ }else if(channel_flow.match(body.get())){
+
+ }else{
+ //signal error
+ std::cout << "Unhandled method: " << *body << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ }
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
+ }else{
+ incoming->setHeader(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
+ }else{
+ incoming->addContent(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+}
+
+void Channel::start(){
+ dispatcher = threadFactory->create(this);
+ dispatcher->start();
+}
+
+void Channel::stop(){
+ closed = true;
+ dispatchMonitor->acquire();
+ dispatchMonitor->notify();
+ dispatchMonitor->release();
+ if(dispatcher){
+ dispatcher->join();
+ }
+}
+
+void Channel::run(){
+ dispatch();
+}
+
+void Channel::enqueue(){
+ if(incoming->isResponse()){
+ retrievalMonitor->acquire();
+ retrieved = incoming;
+ retrievalMonitor->notify();
+ retrievalMonitor->release();
+ }else{
+ dispatchMonitor->acquire();
+ messages.push(incoming);
+ dispatchMonitor->notify();
+ dispatchMonitor->release();
+ }
+ incoming = 0;
+}
+
+IncomingMessage* Channel::dequeue(){
+ dispatchMonitor->acquire();
+ while(messages.empty() && !closed){
+ dispatchMonitor->wait();
+ }
+ IncomingMessage* msg = 0;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ dispatchMonitor->release();
+ return msg;
+}
+
+void Channel::deliver(Consumer* consumer, Message& msg){
+ //record delivery tag:
+ consumer->lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer->listener->received(msg);
+
+ //if the handler calls close on the channel or connection while
+ //handling this message, then consumer will now have been deleted.
+ if(!closed){
+ bool multiple(false);
+ switch(consumer->ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer->count) < prefetch) break;
+ //else drop-through
+ case AUTO_ACK:
+ out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
+ consumer->lastDeliveryTag = 0;
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+void Channel::dispatch(){
+ while(!closed){
+ IncomingMessage* incomingMsg = dequeue();
+ if(incomingMsg){
+ //Note: msg is currently only valid for duration of this call
+ Message msg(incomingMsg->getHeader());
+ incomingMsg->getData(msg.data);
+ if(incomingMsg->isReturn()){
+ if(returnsHandler == 0){
+ //print warning to log/console
+ std::cout << "Message returned: " << msg.getData() << std::endl;
+ }else{
+ returnsHandler->returned(msg);
+ }
+ }else{
+ msg.deliveryTag = incomingMsg->getDeliveryTag();
+ std::string tag = incomingMsg->getConsumerTag();
+
+ if(consumers[tag] == 0){
+ //signal error
+ std::cout << "Unknown consumer: " << tag << std::endl;
+ }else{
+ deliver(consumers[tag], msg);
+ }
+ }
+ delete incomingMsg;
+ }
+ }
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ returnsHandler = handler;
+}
+
+void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Channel::close(){
+ if(con != 0){
+ con->closeChannel(this);
+ }
+}
diff --git a/cpp/client/src/Connection.cpp b/cpp/client/src/Connection.cpp
new file mode 100644
index 0000000000..eeb2330561
--- /dev/null
+++ b/cpp/client/src/Connection.cpp
@@ -0,0 +1,237 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Connection.h"
+#include "Channel.h"
+#include "ConnectorImpl.h"
+#include "Message.h"
+#include "QpidError.h"
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::io;
+using namespace qpid::concurrent;
+
+u_int16_t Connection::channelIdCounter;
+
+Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
+ connector = new ConnectorImpl(debug, _max_frame_size);
+}
+
+Connection::~Connection(){
+ delete connector;
+}
+
+void Connection::open(const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+ this->host = host;
+ this->port = port;
+ connector->setInputHandler(this);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+ out = connector->getOutputHandler();
+ connector->connect(host, port);
+
+ ProtocolInitiation* header = new ProtocolInitiation(8, 0);
+ responses.expect();
+ connector->init(header);
+ responses.receive(connection_start);
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ responses.expect();
+ out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale)));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ responses.receive(connection_tune);
+
+ ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
+ out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connector->setReadTimeout(heartbeat * 2);
+ connector->setWriteTimeout(heartbeat);
+
+ //send connection.open
+ string capabilities;
+ string vhost = virtualhost;
+ responses.expect();
+ out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate(connection_open_ok)){
+ //ok
+ }else if(responses.validate(connection_redirect)){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost() << std::endl;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
+
+}
+
+void Connection::close(){
+ if(!closed){
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok);
+ connector->close();
+ }
+}
+
+void Connection::openChannel(Channel* channel){
+ channel->con = this;
+ channel->id = ++channelIdCounter;
+ channel->out = out;
+ channels[channel->id] = channel;
+ //now send frame to open channel and wait for response
+ string oob;
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok);
+ channel->setQos();
+ channel->closed = false;
+}
+
+void Connection::closeChannel(Channel* channel){
+ //send frame to close channel
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+ closeChannel(channel, code, text, classId, methodId);
+}
+
+void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
+ //send frame to close channel
+ channel->cancelAll();
+ channel->closed = true;
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok);
+ channel->con = 0;
+ channel->out = 0;
+ removeChannel(channel);
+}
+
+void Connection::removeChannel(Channel* channel){
+ //send frame to close channel
+
+ channels.erase(channel->id);
+ channel->out = 0;
+ channel->id = 0;
+ channel->con = 0;
+}
+
+void Connection::received(AMQFrame* frame){
+ u_int16_t channelId = frame->getChannel();
+
+ if(channelId == 0){
+ this->handleBody(frame->getBody());
+ }else{
+ Channel* channel = channels[channelId];
+ if(channel == 0){
+ error(504, "Unknown channel");
+ }else{
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(qpid::QpidError e){
+ channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ }
+ }
+ }
+}
+
+void Connection::handleMethod(AMQMethodBody::shared_ptr body){
+ //connection.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(connection_close.match(body.get())){
+ //send back close ok
+ //close socket
+ ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
+ std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
+ connector->close();
+ }else{
+ std::cout << "Unhandled method for connection: " << *body << std::endl;
+ error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+ }
+}
+
+void Connection::handleHeader(AMQHeaderBody::shared_ptr body){
+ error(504, "Channel error: received header body with channel 0.");
+}
+
+void Connection::handleContent(AMQContentBody::shared_ptr body){
+ error(504, "Channel error: received content body with channel 0.");
+}
+
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+}
+
+void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Connection::error(int code, const string& msg, int classid, int methodid){
+ std::cout << "Connection exception generated: " << code << msg;
+ if(classid || methodid){
+ std::cout << " [" << methodid << ":" << classid << "]";
+ }
+ std::cout << std::endl;
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, (string&) msg, classid, methodid)), connection_close_ok);
+ connector->close();
+}
+
+void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
+ std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl;
+ int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+ string msg = e.msg;
+ if(method == 0){
+ closeChannel(channel, code, msg);
+ }else{
+ closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void Connection::idleIn(){
+ std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
+ connector->close();
+}
+
+void Connection::idleOut(){
+ out->send(new AMQFrame(0, new AMQHeartbeatBody()));
+}
+
+void Connection::shutdown(){
+ closed = true;
+ //close all channels
+ for(iterator i = channels.begin(); i != channels.end(); i++){
+ i->second->stop();
+ }
+}
diff --git a/cpp/client/src/Exchange.cpp b/cpp/client/src/Exchange.cpp
new file mode 100644
index 0000000000..681068dc4c
--- /dev/null
+++ b/cpp/client/src/Exchange.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Exchange.h"
+
+qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
+const std::string& qpid::client::Exchange::getName() const { return name; }
+const std::string& qpid::client::Exchange::getType() const { return type; }
+
+const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
+const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
+const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
+
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp
new file mode 100644
index 0000000000..8e2604c4cb
--- /dev/null
+++ b/cpp/client/src/IncomingMessage.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "IncomingMessage.h"
+#include "QpidError.h"
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
+IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
+IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+
+IncomingMessage::~IncomingMessage(){
+}
+
+void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr header){
+ this->header = header;
+}
+
+void IncomingMessage::addContent(AMQContentBody::shared_ptr content){
+ this->content.push_back(content);
+}
+
+bool IncomingMessage::isComplete(){
+ return header != 0 && header->getContentSize() == contentSize();
+}
+
+bool IncomingMessage::isReturn(){
+ return returned;
+}
+
+bool IncomingMessage::isDelivery(){
+ return delivered;
+}
+
+bool IncomingMessage::isResponse(){
+ return response;
+}
+
+string& IncomingMessage::getConsumerTag(){
+ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
+ return delivered->getConsumerTag();
+}
+
+u_int64_t IncomingMessage::getDeliveryTag(){
+ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
+ return delivered->getDeliveryTag();
+}
+
+AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
+ return header;
+}
+
+void IncomingMessage::getData(string& s){
+ int count(content.size());
+ for(int i = 0; i < count; i++){
+ if(i == 0) s = content[i]->getData();
+ else s += content[i]->getData();
+ }
+}
+
+long IncomingMessage::contentSize(){
+ long size(0);
+ int count(content.size());
+ for(int i = 0; i < count; i++){
+ size += content[i]->size();
+ }
+ return size;
+}
diff --git a/cpp/client/src/Message.cpp b/cpp/client/src/Message.cpp
new file mode 100644
index 0000000000..71befe57b1
--- /dev/null
+++ b/cpp/client/src/Message.cpp
@@ -0,0 +1,147 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Message.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+Message::Message(){
+ header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC));
+}
+
+Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
+}
+
+Message::~Message(){
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+std::string& Message::getContentType(){
+ return getHeaderProperties()->getContentType();
+}
+
+std::string& Message::getContentEncoding(){
+ return getHeaderProperties()->getContentEncoding();
+}
+
+FieldTable& Message::getHeaders(){
+ return getHeaderProperties()->getHeaders();
+}
+
+u_int8_t Message::getDeliveryMode(){
+ return getHeaderProperties()->getDeliveryMode();
+}
+
+u_int8_t Message::getPriority(){
+ return getHeaderProperties()->getPriority();
+}
+
+std::string& Message::getCorrelationId(){
+ return getHeaderProperties()->getCorrelationId();
+}
+
+std::string& Message::getReplyTo(){
+ return getHeaderProperties()->getReplyTo();
+}
+
+std::string& Message::getExpiration(){
+ return getHeaderProperties()->getExpiration();
+}
+
+std::string& Message::getMessageId(){
+ return getHeaderProperties()->getMessageId();
+}
+
+u_int64_t Message::getTimestamp(){
+ return getHeaderProperties()->getTimestamp();
+}
+
+std::string& Message::getType(){
+ return getHeaderProperties()->getType();
+}
+
+std::string& Message::getUserId(){
+ return getHeaderProperties()->getUserId();
+}
+
+std::string& Message::getAppId(){
+ return getHeaderProperties()->getAppId();
+}
+
+std::string& Message::getClusterId(){
+ return getHeaderProperties()->getClusterId();
+}
+
+void Message::setContentType(std::string& type){
+ getHeaderProperties()->setContentType(type);
+}
+
+void Message::setContentEncoding(std::string& encoding){
+ getHeaderProperties()->setContentEncoding(encoding);
+}
+
+void Message::setHeaders(FieldTable& headers){
+ getHeaderProperties()->setHeaders(headers);
+}
+
+void Message::setDeliveryMode(u_int8_t mode){
+ getHeaderProperties()->setDeliveryMode(mode);
+}
+
+void Message::setPriority(u_int8_t priority){
+ getHeaderProperties()->setPriority(priority);
+}
+
+void Message::setCorrelationId(std::string& correlationId){
+ getHeaderProperties()->setCorrelationId(correlationId);
+}
+
+void Message::setReplyTo(std::string& replyTo){
+ getHeaderProperties()->setReplyTo(replyTo);
+}
+
+void Message::setExpiration(std::string& expiration){
+ getHeaderProperties()->setExpiration(expiration);
+}
+
+void Message::setMessageId(std::string& messageId){
+ getHeaderProperties()->setMessageId(messageId);
+}
+
+void Message::setTimestamp(u_int64_t timestamp){
+ getHeaderProperties()->setTimestamp(timestamp);
+}
+
+void Message::setType(std::string& type){
+ getHeaderProperties()->setType(type);
+}
+
+void Message::setUserId(std::string& userId){
+ getHeaderProperties()->setUserId(userId);
+}
+
+void Message::setAppId(std::string& appId){
+ getHeaderProperties()->setAppId(appId);
+}
+
+void Message::setClusterId(std::string& clusterId){
+ getHeaderProperties()->setClusterId(clusterId);
+}
diff --git a/cpp/client/src/Queue.cpp b/cpp/client/src/Queue.cpp
new file mode 100644
index 0000000000..cb957dd993
--- /dev/null
+++ b/cpp/client/src/Queue.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+
+qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){}
+
+qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){}
+
+qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive)
+ : name(_name), autodelete(_autodelete), exclusive(_exclusive){}
+
+const std::string& qpid::client::Queue::getName() const{
+ return name;
+}
+
+void qpid::client::Queue::setName(const std::string& name){
+ this->name = name;
+}
+
+bool qpid::client::Queue::isAutoDelete() const{
+ return autodelete;
+}
+
+bool qpid::client::Queue::isExclusive() const{
+ return exclusive;
+}
+
+
+
+
diff --git a/cpp/client/src/ResponseHandler.cpp b/cpp/client/src/ResponseHandler.cpp
new file mode 100644
index 0000000000..837bba37fd
--- /dev/null
+++ b/cpp/client/src/ResponseHandler.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ResponseHandler.h"
+#include "MonitorImpl.h"
+#include "QpidError.h"
+
+qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
+ monitor = new qpid::concurrent::MonitorImpl();
+}
+
+qpid::client::ResponseHandler::~ResponseHandler(){
+ delete monitor;
+}
+
+bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
+ return expected.match(response.get());
+}
+
+void qpid::client::ResponseHandler::waitForResponse(){
+ monitor->acquire();
+ if(waiting){
+ monitor->wait();
+ }
+ monitor->release();
+}
+
+void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr response){
+ this->response = response;
+ monitor->acquire();
+ waiting = false;
+ monitor->notify();
+ monitor->release();
+}
+
+void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
+ monitor->acquire();
+ if(waiting){
+ monitor->wait();
+ }
+ monitor->release();
+ if(!validate(expected)){
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
+ }
+}
+
+void qpid::client::ResponseHandler::expect(){
+ waiting = true;
+}