diff options
Diffstat (limited to 'cpp/client/src')
-rw-r--r-- | cpp/client/src/Channel.cpp | 432 | ||||
-rw-r--r-- | cpp/client/src/Connection.cpp | 237 | ||||
-rw-r--r-- | cpp/client/src/Exchange.cpp | 30 | ||||
-rw-r--r-- | cpp/client/src/IncomingMessage.cpp | 85 | ||||
-rw-r--r-- | cpp/client/src/Message.cpp | 147 | ||||
-rw-r--r-- | cpp/client/src/Queue.cpp | 47 | ||||
-rw-r--r-- | cpp/client/src/ResponseHandler.cpp | 63 |
7 files changed, 1041 insertions, 0 deletions
diff --git a/cpp/client/src/Channel.cpp b/cpp/client/src/Channel.cpp new file mode 100644 index 0000000000..e965f7e5dd --- /dev/null +++ b/cpp/client/src/Channel.cpp @@ -0,0 +1,432 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Channel.h" +#include "MonitorImpl.h" +#include "ThreadFactoryImpl.h" +#include "Message.h" +#include "QpidError.h" + +using namespace std::tr1;//to use dynamic_pointer_cast +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::concurrent; + +Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), incoming(0), con(0), out(0), + prefetch(_prefetch), + transactional(_transactional), + dispatcher(0), + closed(true){ + threadFactory = new ThreadFactoryImpl(); + dispatchMonitor = new MonitorImpl(); + retrievalMonitor = new MonitorImpl(); +} + +Channel::~Channel(){ + if(dispatcher){ + stop(); + delete dispatcher; + } + delete retrievalMonitor; + delete dispatchMonitor; + delete threadFactory; +} + +void Channel::setPrefetch(u_int16_t prefetch){ + this->prefetch = prefetch; + if(con != 0 && out != 0){ + setQos(); + } +} + +void Channel::setQos(){ + sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok); + if(transactional){ + sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok); + } +} + +void Channel::declareExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + string type = exchange.getType(); + FieldTable args; + AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args)); + if(synch){ + sendAndReceive(frame, exchange_declare_ok); + }else{ + out->send(frame); + } +} + +void Channel::deleteExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch)); + if(synch){ + sendAndReceive(frame, exchange_delete_ok); + }else{ + out->send(frame); + } +} + +void Channel::declareQueue(Queue& queue, bool synch){ + string name = queue.getName(); + FieldTable args; + AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false, + queue.isExclusive(), + queue.isAutoDelete(), !synch, args)); + if(synch){ + sendAndReceive(frame, queue_declare_ok); + if(queue.getName().length() == 0){ + QueueDeclareOkBody::shared_ptr response = + dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); + queue.setName(response->getQueue()); + } + }else{ + out->send(frame); + } +} + +void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ + //ticket, queue, ifunused, ifempty, nowait + string name = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch)); + if(synch){ + sendAndReceive(frame, queue_delete_ok); + }else{ + out->send(frame); + } +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, (string&) key,!synch, (FieldTable&) args)); + if(synch){ + sendAndReceive(frame, queue_bind_ok); + }else{ + out->send(frame); + } +} + +void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, + int ackMode, bool noLocal, bool synch){ + + string q = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); + if(synch){ + sendAndReceive(frame, basic_consume_ok); + BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); + tag = response->getConsumerTag(); + }else{ + out->send(frame); + } + Consumer* c = new Consumer(); + c->listener = listener; + c->ackMode = ackMode; + c->lastDeliveryTag = 0; + consumers[tag] = c; +} + +void Channel::cancel(std::string& tag, bool synch){ + Consumer* c = consumers[tag]; + if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ + out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + } + + AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch)); + if(synch){ + sendAndReceive(frame, basic_cancel_ok); + }else{ + out->send(frame); + } + consumers.erase(tag); + if(c != 0){ + delete c; + } +} + +void Channel::cancelAll(){ + int count(consumers.size()); + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ + Consumer* c = i->second; + if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ + out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + } + consumers.erase(i); + delete c; + } +} + +void Channel::retrieve(Message& msg){ + retrievalMonitor->acquire(); + while(retrieved == 0){ + retrievalMonitor->wait(); + } + + msg.header = retrieved->getHeader(); + msg.deliveryTag = retrieved->getDeliveryTag(); + retrieved->getData(msg.data); + delete retrieved; + retrieved = 0; + + retrievalMonitor->release(); +} + +bool Channel::get(Message& msg, const Queue& queue, int ackMode){ + string name = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode)); + responses.expect(); + out->send(frame); + responses.waitForResponse(); + AMQMethodBody::shared_ptr response = responses.getResponse(); + if(basic_get_ok.match(response.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); + } + retrieve(msg); + return true; + }if(basic_get_empty.match(response.get())){ + return false; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); + } +} + + +void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ + string e = exchange.getName(); + string key = routingKey; + + out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate))); + //break msg up into header frame and content frame(s) and send these + string data = msg.getData(); + msg.header->setContentSize(data.length()); + AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); + out->send(new AMQFrame(id, body)); + + int data_length = data.length(); + if(data_length > 0){ + //TODO fragmentation of messages, need to know max frame size for connection + int frag_size = con->getMaxFrameSize() - 4; + if(data_length < frag_size){ + out->send(new AMQFrame(id, new AMQContentBody(data))); + }else{ + int frag_count = data_length / frag_size; + for(int i = 0; i < frag_count; i++){ + int pos = i*frag_size; + int len = i < frag_count - 1 ? frag_size : data_length - pos; + string frag(data.substr(pos, len)); + out->send(new AMQFrame(id, new AMQContentBody(frag))); + } + } + } +} + +void Channel::commit(){ + AMQFrame* frame = new AMQFrame(id, new TxCommitBody()); + sendAndReceive(frame, tx_commit_ok); +} + +void Channel::rollback(){ + AMQFrame* frame = new AMQFrame(id, new TxRollbackBody()); + sendAndReceive(frame, tx_rollback_ok); +} + +void Channel::handleMethod(AMQMethodBody::shared_ptr body){ + //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(basic_deliver.match(body.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); + } + }else if(basic_return.match(body.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); + } + }else if(channel_close.match(body.get())){ + con->removeChannel(this); + //need to signal application that channel has been closed through exception + + }else if(channel_flow.match(body.get())){ + + }else{ + //signal error + std::cout << "Unhandled method: " << *body << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); + } +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ + if(incoming == 0){ + //handle invalid frame sequence + std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); + }else{ + incoming->setHeader(body); + if(incoming->isComplete()){ + enqueue(); + } + } +} + +void Channel::handleContent(AMQContentBody::shared_ptr body){ + if(incoming == 0){ + //handle invalid frame sequence + std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); + }else{ + incoming->addContent(body); + if(incoming->isComplete()){ + enqueue(); + } + } +} + +void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); +} + +void Channel::start(){ + dispatcher = threadFactory->create(this); + dispatcher->start(); +} + +void Channel::stop(){ + closed = true; + dispatchMonitor->acquire(); + dispatchMonitor->notify(); + dispatchMonitor->release(); + if(dispatcher){ + dispatcher->join(); + } +} + +void Channel::run(){ + dispatch(); +} + +void Channel::enqueue(){ + if(incoming->isResponse()){ + retrievalMonitor->acquire(); + retrieved = incoming; + retrievalMonitor->notify(); + retrievalMonitor->release(); + }else{ + dispatchMonitor->acquire(); + messages.push(incoming); + dispatchMonitor->notify(); + dispatchMonitor->release(); + } + incoming = 0; +} + +IncomingMessage* Channel::dequeue(){ + dispatchMonitor->acquire(); + while(messages.empty() && !closed){ + dispatchMonitor->wait(); + } + IncomingMessage* msg = 0; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + dispatchMonitor->release(); + return msg; +} + +void Channel::deliver(Consumer* consumer, Message& msg){ + //record delivery tag: + consumer->lastDeliveryTag = msg.getDeliveryTag(); + + //allow registered listener to handle the message + consumer->listener->received(msg); + + //if the handler calls close on the channel or connection while + //handling this message, then consumer will now have been deleted. + if(!closed){ + bool multiple(false); + switch(consumer->ackMode){ + case LAZY_ACK: + multiple = true; + if(++(consumer->count) < prefetch) break; + //else drop-through + case AUTO_ACK: + out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); + consumer->lastDeliveryTag = 0; + } + } + + //as it stands, transactionality is entirely orthogonal to ack + //mode, though the acks will not be processed by the broker under + //a transaction until it commits. +} + +void Channel::dispatch(){ + while(!closed){ + IncomingMessage* incomingMsg = dequeue(); + if(incomingMsg){ + //Note: msg is currently only valid for duration of this call + Message msg(incomingMsg->getHeader()); + incomingMsg->getData(msg.data); + if(incomingMsg->isReturn()){ + if(returnsHandler == 0){ + //print warning to log/console + std::cout << "Message returned: " << msg.getData() << std::endl; + }else{ + returnsHandler->returned(msg); + } + }else{ + msg.deliveryTag = incomingMsg->getDeliveryTag(); + std::string tag = incomingMsg->getConsumerTag(); + + if(consumers[tag] == 0){ + //signal error + std::cout << "Unknown consumer: " << tag << std::endl; + }else{ + deliver(consumers[tag], msg); + } + } + delete incomingMsg; + } + } +} + +void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + returnsHandler = handler; +} + +void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Channel::close(){ + if(con != 0){ + con->closeChannel(this); + } +} diff --git a/cpp/client/src/Connection.cpp b/cpp/client/src/Connection.cpp new file mode 100644 index 0000000000..eeb2330561 --- /dev/null +++ b/cpp/client/src/Connection.cpp @@ -0,0 +1,237 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Connection.h" +#include "Channel.h" +#include "ConnectorImpl.h" +#include "Message.h" +#include "QpidError.h" +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::io; +using namespace qpid::concurrent; + +u_int16_t Connection::channelIdCounter; + +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ + connector = new ConnectorImpl(debug, _max_frame_size); +} + +Connection::~Connection(){ + delete connector; +} + +void Connection::open(const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ + this->host = host; + this->port = port; + connector->setInputHandler(this); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); + out = connector->getOutputHandler(); + connector->connect(host, port); + + ProtocolInitiation* header = new ProtocolInitiation(8, 0); + responses.expect(); + connector->init(header); + responses.receive(connection_start); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + responses.expect(); + out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + responses.receive(connection_tune); + + ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + + u_int16_t heartbeat = proposal->getHeartbeat(); + connector->setReadTimeout(heartbeat * 2); + connector->setWriteTimeout(heartbeat); + + //send connection.open + string capabilities; + string vhost = virtualhost; + responses.expect(); + out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); + //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). + responses.waitForResponse(); + if(responses.validate(connection_open_ok)){ + //ok + }else if(responses.validate(connection_redirect)){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); + std::cout << "Received redirection to " << redirect->getHost() << std::endl; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + } + +} + +void Connection::close(){ + if(!closed){ + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); + connector->close(); + } +} + +void Connection::openChannel(Channel* channel){ + channel->con = this; + channel->id = ++channelIdCounter; + channel->out = out; + channels[channel->id] = channel; + //now send frame to open channel and wait for response + string oob; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); + channel->setQos(); + channel->closed = false; +} + +void Connection::closeChannel(Channel* channel){ + //send frame to close channel + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + closeChannel(channel, code, text, classId, methodId); +} + +void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ + //send frame to close channel + channel->cancelAll(); + channel->closed = true; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); + channel->con = 0; + channel->out = 0; + removeChannel(channel); +} + +void Connection::removeChannel(Channel* channel){ + //send frame to close channel + + channels.erase(channel->id); + channel->out = 0; + channel->id = 0; + channel->con = 0; +} + +void Connection::received(AMQFrame* frame){ + u_int16_t channelId = frame->getChannel(); + + if(channelId == 0){ + this->handleBody(frame->getBody()); + }else{ + Channel* channel = channels[channelId]; + if(channel == 0){ + error(504, "Unknown channel"); + }else{ + try{ + channel->handleBody(frame->getBody()); + }catch(qpid::QpidError e){ + channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + } + } + } +} + +void Connection::handleMethod(AMQMethodBody::shared_ptr body){ + //connection.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(connection_close.match(body.get())){ + //send back close ok + //close socket + ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); + std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; + connector->close(); + }else{ + std::cout << "Unhandled method for connection: " << *body << std::endl; + error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); + } +} + +void Connection::handleHeader(AMQHeaderBody::shared_ptr body){ + error(504, "Channel error: received header body with channel 0."); +} + +void Connection::handleContent(AMQContentBody::shared_ptr body){ + error(504, "Channel error: received content body with channel 0."); +} + +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ +} + +void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Connection::error(int code, const string& msg, int classid, int methodid){ + std::cout << "Connection exception generated: " << code << msg; + if(classid || methodid){ + std::cout << " [" << methodid << ":" << classid << "]"; + } + std::cout << std::endl; + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, (string&) msg, classid, methodid)), connection_close_ok); + connector->close(); +} + +void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ + std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl; + int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; + string msg = e.msg; + if(method == 0){ + closeChannel(channel, code, msg); + }else{ + closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); + } +} + +void Connection::idleIn(){ + std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; + connector->close(); +} + +void Connection::idleOut(){ + out->send(new AMQFrame(0, new AMQHeartbeatBody())); +} + +void Connection::shutdown(){ + closed = true; + //close all channels + for(iterator i = channels.begin(); i != channels.end(); i++){ + i->second->stop(); + } +} diff --git a/cpp/client/src/Exchange.cpp b/cpp/client/src/Exchange.cpp new file mode 100644 index 0000000000..681068dc4c --- /dev/null +++ b/cpp/client/src/Exchange.cpp @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Exchange.h" + +qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} +const std::string& qpid::client::Exchange::getName() const { return name; } +const std::string& qpid::client::Exchange::getType() const { return type; } + +const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; +const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; +const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; + +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp new file mode 100644 index 0000000000..8e2604c4cb --- /dev/null +++ b/cpp/client/src/IncomingMessage.cpp @@ -0,0 +1,85 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "IncomingMessage.h" +#include "QpidError.h" +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + +IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} +IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} +IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} + +IncomingMessage::~IncomingMessage(){ +} + +void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr header){ + this->header = header; +} + +void IncomingMessage::addContent(AMQContentBody::shared_ptr content){ + this->content.push_back(content); +} + +bool IncomingMessage::isComplete(){ + return header != 0 && header->getContentSize() == contentSize(); +} + +bool IncomingMessage::isReturn(){ + return returned; +} + +bool IncomingMessage::isDelivery(){ + return delivered; +} + +bool IncomingMessage::isResponse(){ + return response; +} + +string& IncomingMessage::getConsumerTag(){ + if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); + return delivered->getConsumerTag(); +} + +u_int64_t IncomingMessage::getDeliveryTag(){ + if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); + return delivered->getDeliveryTag(); +} + +AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ + return header; +} + +void IncomingMessage::getData(string& s){ + int count(content.size()); + for(int i = 0; i < count; i++){ + if(i == 0) s = content[i]->getData(); + else s += content[i]->getData(); + } +} + +long IncomingMessage::contentSize(){ + long size(0); + int count(content.size()); + for(int i = 0; i < count; i++){ + size += content[i]->size(); + } + return size; +} diff --git a/cpp/client/src/Message.cpp b/cpp/client/src/Message.cpp new file mode 100644 index 0000000000..71befe57b1 --- /dev/null +++ b/cpp/client/src/Message.cpp @@ -0,0 +1,147 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Message.h" + +using namespace qpid::client; +using namespace qpid::framing; + +Message::Message(){ + header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); +} + +Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ +} + +Message::~Message(){ +} + +BasicHeaderProperties* Message::getHeaderProperties(){ + return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +} + +std::string& Message::getContentType(){ + return getHeaderProperties()->getContentType(); +} + +std::string& Message::getContentEncoding(){ + return getHeaderProperties()->getContentEncoding(); +} + +FieldTable& Message::getHeaders(){ + return getHeaderProperties()->getHeaders(); +} + +u_int8_t Message::getDeliveryMode(){ + return getHeaderProperties()->getDeliveryMode(); +} + +u_int8_t Message::getPriority(){ + return getHeaderProperties()->getPriority(); +} + +std::string& Message::getCorrelationId(){ + return getHeaderProperties()->getCorrelationId(); +} + +std::string& Message::getReplyTo(){ + return getHeaderProperties()->getReplyTo(); +} + +std::string& Message::getExpiration(){ + return getHeaderProperties()->getExpiration(); +} + +std::string& Message::getMessageId(){ + return getHeaderProperties()->getMessageId(); +} + +u_int64_t Message::getTimestamp(){ + return getHeaderProperties()->getTimestamp(); +} + +std::string& Message::getType(){ + return getHeaderProperties()->getType(); +} + +std::string& Message::getUserId(){ + return getHeaderProperties()->getUserId(); +} + +std::string& Message::getAppId(){ + return getHeaderProperties()->getAppId(); +} + +std::string& Message::getClusterId(){ + return getHeaderProperties()->getClusterId(); +} + +void Message::setContentType(std::string& type){ + getHeaderProperties()->setContentType(type); +} + +void Message::setContentEncoding(std::string& encoding){ + getHeaderProperties()->setContentEncoding(encoding); +} + +void Message::setHeaders(FieldTable& headers){ + getHeaderProperties()->setHeaders(headers); +} + +void Message::setDeliveryMode(u_int8_t mode){ + getHeaderProperties()->setDeliveryMode(mode); +} + +void Message::setPriority(u_int8_t priority){ + getHeaderProperties()->setPriority(priority); +} + +void Message::setCorrelationId(std::string& correlationId){ + getHeaderProperties()->setCorrelationId(correlationId); +} + +void Message::setReplyTo(std::string& replyTo){ + getHeaderProperties()->setReplyTo(replyTo); +} + +void Message::setExpiration(std::string& expiration){ + getHeaderProperties()->setExpiration(expiration); +} + +void Message::setMessageId(std::string& messageId){ + getHeaderProperties()->setMessageId(messageId); +} + +void Message::setTimestamp(u_int64_t timestamp){ + getHeaderProperties()->setTimestamp(timestamp); +} + +void Message::setType(std::string& type){ + getHeaderProperties()->setType(type); +} + +void Message::setUserId(std::string& userId){ + getHeaderProperties()->setUserId(userId); +} + +void Message::setAppId(std::string& appId){ + getHeaderProperties()->setAppId(appId); +} + +void Message::setClusterId(std::string& clusterId){ + getHeaderProperties()->setClusterId(clusterId); +} diff --git a/cpp/client/src/Queue.cpp b/cpp/client/src/Queue.cpp new file mode 100644 index 0000000000..cb957dd993 --- /dev/null +++ b/cpp/client/src/Queue.cpp @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Queue.h" + +qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){} + +qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){} + +qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){} + +qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive) + : name(_name), autodelete(_autodelete), exclusive(_exclusive){} + +const std::string& qpid::client::Queue::getName() const{ + return name; +} + +void qpid::client::Queue::setName(const std::string& name){ + this->name = name; +} + +bool qpid::client::Queue::isAutoDelete() const{ + return autodelete; +} + +bool qpid::client::Queue::isExclusive() const{ + return exclusive; +} + + + + diff --git a/cpp/client/src/ResponseHandler.cpp b/cpp/client/src/ResponseHandler.cpp new file mode 100644 index 0000000000..837bba37fd --- /dev/null +++ b/cpp/client/src/ResponseHandler.cpp @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "ResponseHandler.h" +#include "MonitorImpl.h" +#include "QpidError.h" + +qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ + monitor = new qpid::concurrent::MonitorImpl(); +} + +qpid::client::ResponseHandler::~ResponseHandler(){ + delete monitor; +} + +bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ + return expected.match(response.get()); +} + +void qpid::client::ResponseHandler::waitForResponse(){ + monitor->acquire(); + if(waiting){ + monitor->wait(); + } + monitor->release(); +} + +void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr response){ + this->response = response; + monitor->acquire(); + waiting = false; + monitor->notify(); + monitor->release(); +} + +void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ + monitor->acquire(); + if(waiting){ + monitor->wait(); + } + monitor->release(); + if(!validate(expected)){ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); + } +} + +void qpid::client::ResponseHandler::expect(){ + waiting = true; +} |