diff options
author | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
commit | 8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch) | |
tree | 1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/client/src | |
parent | 9a808fb13aba243d41bbdab75158dae5939a80a4 (diff) | |
download | qpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz |
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/client/src')
-rw-r--r-- | cpp/client/src/Channel.cpp | 438 | ||||
-rw-r--r-- | cpp/client/src/Connection.cpp | 237 | ||||
-rw-r--r-- | cpp/client/src/Exchange.cpp | 30 | ||||
-rw-r--r-- | cpp/client/src/IncomingMessage.cpp | 85 | ||||
-rw-r--r-- | cpp/client/src/Message.cpp | 147 | ||||
-rw-r--r-- | cpp/client/src/MessageListener.cpp | 21 | ||||
-rw-r--r-- | cpp/client/src/Queue.cpp | 47 | ||||
-rw-r--r-- | cpp/client/src/ResponseHandler.cpp | 63 | ||||
-rw-r--r-- | cpp/client/src/ReturnedMessageHandler.cpp | 21 |
9 files changed, 0 insertions, 1089 deletions
diff --git a/cpp/client/src/Channel.cpp b/cpp/client/src/Channel.cpp deleted file mode 100644 index cf2f5bc081..0000000000 --- a/cpp/client/src/Channel.cpp +++ /dev/null @@ -1,438 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Channel.h" -#include "MonitorImpl.h" -#include "ThreadFactoryImpl.h" -#include "Message.h" -#include "QpidError.h" - -using namespace std::tr1;//to use dynamic_pointer_cast -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::concurrent; - -Channel::Channel(bool _transactional, u_int16_t _prefetch) : - id(0), - con(0), - dispatcher(0), - out(0), - incoming(0), - closed(true), - prefetch(_prefetch), - transactional(_transactional) -{ - threadFactory = new ThreadFactoryImpl(); - dispatchMonitor = new MonitorImpl(); - retrievalMonitor = new MonitorImpl(); -} - -Channel::~Channel(){ - if(dispatcher){ - stop(); - delete dispatcher; - } - delete retrievalMonitor; - delete dispatchMonitor; - delete threadFactory; -} - -void Channel::setPrefetch(u_int16_t _prefetch){ - prefetch = _prefetch; - if(con != 0 && out != 0){ - setQos(); - } -} - -void Channel::setQos(){ - sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok); - if(transactional){ - sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok); - } -} - -void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); - FieldTable args; - AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args)); - if(synch){ - sendAndReceive(frame, exchange_declare_ok); - }else{ - out->send(frame); - } -} - -void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch)); - if(synch){ - sendAndReceive(frame, exchange_delete_ok); - }else{ - out->send(frame); - } -} - -void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); - FieldTable args; - AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false, - queue.isExclusive(), - queue.isAutoDelete(), !synch, args)); - if(synch){ - sendAndReceive(frame, queue_declare_ok); - if(queue.getName().length() == 0){ - QueueDeclareOkBody::shared_ptr response = - dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); - queue.setName(response->getQueue()); - } - }else{ - out->send(frame); - } -} - -void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch)); - if(synch){ - sendAndReceive(frame, queue_delete_ok); - }else{ - out->send(frame); - } -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - // TODO aconway 2006-10-10: not const correct, get rid of const_cast. - // - AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, const_cast<FieldTable&>(args))); - if(synch){ - sendAndReceive(frame, queue_bind_ok); - }else{ - out->send(frame); - } -} - -void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, - int ackMode, bool noLocal, bool synch){ - - string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); - if(synch){ - sendAndReceive(frame, basic_consume_ok); - BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); - tag = response->getConsumerTag(); - }else{ - out->send(frame); - } - Consumer* c = new Consumer(); - c->listener = listener; - c->ackMode = ackMode; - c->lastDeliveryTag = 0; - consumers[tag] = c; -} - -void Channel::cancel(std::string& tag, bool synch){ - Consumer* c = consumers[tag]; - if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); - } - - AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch)); - if(synch){ - sendAndReceive(frame, basic_cancel_ok); - }else{ - out->send(frame); - } - consumers.erase(tag); - if(c != 0){ - delete c; - } -} - -void Channel::cancelAll(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ - Consumer* c = i->second; - if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); - } - consumers.erase(i); - delete c; - } -} - -void Channel::retrieve(Message& msg){ - retrievalMonitor->acquire(); - while(retrieved == 0){ - retrievalMonitor->wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - retrieved->getData(msg.data); - delete retrieved; - retrieved = 0; - - retrievalMonitor->release(); -} - -bool Channel::get(Message& msg, const Queue& queue, int ackMode){ - string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode)); - responses.expect(); - out->send(frame); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(basic_get_ok.match(response.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); - } - retrieve(msg); - return true; - }if(basic_get_empty.match(response.get())){ - return false; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); - } -} - - -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - string e = exchange.getName(); - string key = routingKey; - - out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate))); - //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); - AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(id, body)); - - int data_length = data.length(); - if(data_length > 0){ - //TODO fragmentation of messages, need to know max frame size for connection - int frag_size = con->getMaxFrameSize() - 4; - if(data_length < frag_size){ - out->send(new AMQFrame(id, new AMQContentBody(data))); - }else{ - int frag_count = data_length / frag_size; - for(int i = 0; i < frag_count; i++){ - int pos = i*frag_size; - int len = i < frag_count - 1 ? frag_size : data_length - pos; - string frag(data.substr(pos, len)); - out->send(new AMQFrame(id, new AMQContentBody(frag))); - } - } - } -} - -void Channel::commit(){ - AMQFrame* frame = new AMQFrame(id, new TxCommitBody()); - sendAndReceive(frame, tx_commit_ok); -} - -void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(id, new TxRollbackBody()); - sendAndReceive(frame, tx_rollback_ok); -} - -void Channel::handleMethod(AMQMethodBody::shared_ptr body){ - //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(basic_deliver.match(body.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); - } - }else if(basic_return.match(body.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); - } - }else if(channel_close.match(body.get())){ - con->removeChannel(this); - //need to signal application that channel has been closed through exception - - }else if(channel_flow.match(body.get())){ - - }else{ - //signal error - std::cout << "Unhandled method: " << *body << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); - } -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - dispatcher = threadFactory->create(this); - dispatcher->start(); -} - -void Channel::stop(){ - closed = true; - dispatchMonitor->acquire(); - dispatchMonitor->notify(); - dispatchMonitor->release(); - if(dispatcher){ - dispatcher->join(); - } -} - -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - if(incoming->isResponse()){ - retrievalMonitor->acquire(); - retrieved = incoming; - retrievalMonitor->notify(); - retrievalMonitor->release(); - }else{ - dispatchMonitor->acquire(); - messages.push(incoming); - dispatchMonitor->notify(); - dispatchMonitor->release(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - dispatchMonitor->acquire(); - while(messages.empty() && !closed){ - dispatchMonitor->wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - dispatchMonitor->release(); - return msg; -} - -void Channel::deliver(Consumer* consumer, Message& msg){ - //record delivery tag: - consumer->lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer->listener->received(msg); - - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. - if(!closed){ - bool multiple(false); - switch(consumer->ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer->count) < prefetch) break; - //else drop-through - case AUTO_ACK: - out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); - consumer->lastDeliveryTag = 0; - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - -void Channel::dispatch(){ - while(!closed){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - incomingMsg->getData(msg.data); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); - } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers[tag] == 0){ - //signal error - std::cout << "Unknown consumer: " << tag << std::endl; - }else{ - deliver(consumers[tag], msg); - } - } - delete incomingMsg; - } - } -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - returnsHandler = handler; -} - -void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Channel::close(){ - if(con != 0){ - con->closeChannel(this); - } -} diff --git a/cpp/client/src/Connection.cpp b/cpp/client/src/Connection.cpp deleted file mode 100644 index f332d2242c..0000000000 --- a/cpp/client/src/Connection.cpp +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Connection.h" -#include "Channel.h" -#include "ConnectorImpl.h" -#include "Message.h" -#include "QpidError.h" -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::io; -using namespace qpid::concurrent; - -u_int16_t Connection::channelIdCounter; - -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ - connector = new ConnectorImpl(debug, _max_frame_size); -} - -Connection::~Connection(){ - delete connector; -} - -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ - host = _host; - port = _port; - connector->setInputHandler(this); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); - connector->connect(host, port); - - ProtocolInitiation* header = new ProtocolInitiation(8, 0); - responses.expect(); - connector->init(header); - responses.receive(connection_start); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - responses.expect(); - out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - responses.receive(connection_tune); - - ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - - u_int16_t heartbeat = proposal->getHeartbeat(); - connector->setReadTimeout(heartbeat * 2); - connector->setWriteTimeout(heartbeat); - - //send connection.open - string capabilities; - string vhost = virtualhost; - responses.expect(); - out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). - responses.waitForResponse(); - if(responses.validate(connection_open_ok)){ - //ok - }else if(responses.validate(connection_redirect)){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() << std::endl; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } - -} - -void Connection::close(){ - if(!closed){ - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); - connector->close(); - } -} - -void Connection::openChannel(Channel* channel){ - channel->con = this; - channel->id = ++channelIdCounter; - channel->out = out; - channels[channel->id] = channel; - //now send frame to open channel and wait for response - string oob; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); - channel->setQos(); - channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ - //send frame to close channel - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - closeChannel(channel, code, text, classId, methodId); -} - -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ - //send frame to close channel - channel->cancelAll(); - channel->closed = true; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); - channel->con = 0; - channel->out = 0; - removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ - //send frame to close channel - - channels.erase(channel->id); - channel->out = 0; - channel->id = 0; - channel->con = 0; -} - -void Connection::received(AMQFrame* frame){ - u_int16_t channelId = frame->getChannel(); - - if(channelId == 0){ - this->handleBody(frame->getBody()); - }else{ - Channel* channel = channels[channelId]; - if(channel == 0){ - error(504, "Unknown channel"); - }else{ - try{ - channel->handleBody(frame->getBody()); - }catch(qpid::QpidError e){ - channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } - } - } -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ - //connection.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(connection_close.match(body.get())){ - //send back close ok - //close socket - ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); - std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; - connector->close(); - }else{ - std::cout << "Unhandled method for connection: " << *body << std::endl; - error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); - } -} - -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ - error(504, "Channel error: received header body with channel 0."); -} - -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ - error(504, "Channel error: received content body with channel 0."); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Connection::error(int code, const string& msg, int classid, int methodid){ - std::cout << "Connection exception generated: " << code << msg; - if(classid || methodid){ - std::cout << " [" << methodid << ":" << classid << "]"; - } - std::cout << std::endl; - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok); - connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl; - int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0){ - closeChannel(channel, code, msg); - }else{ - closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); - } -} - -void Connection::idleIn(){ - std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; - connector->close(); -} - -void Connection::idleOut(){ - out->send(new AMQFrame(0, new AMQHeartbeatBody())); -} - -void Connection::shutdown(){ - closed = true; - //close all channels - for(iterator i = channels.begin(); i != channels.end(); i++){ - i->second->stop(); - } -} diff --git a/cpp/client/src/Exchange.cpp b/cpp/client/src/Exchange.cpp deleted file mode 100644 index 681068dc4c..0000000000 --- a/cpp/client/src/Exchange.cpp +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Exchange.h" - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp deleted file mode 100644 index 9576051302..0000000000 --- a/cpp/client/src/IncomingMessage.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "IncomingMessage.h" -#include "QpidError.h" -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; - -IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} -IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} -IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} - -IncomingMessage::~IncomingMessage(){ -} - -void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; -} - -void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){ - this->content.push_back(_content); -} - -bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == contentSize(); -} - -bool IncomingMessage::isReturn(){ - return returned; -} - -bool IncomingMessage::isDelivery(){ - return delivered; -} - -bool IncomingMessage::isResponse(){ - return response; -} - -const string& IncomingMessage::getConsumerTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); - return delivered->getConsumerTag(); -} - -u_int64_t IncomingMessage::getDeliveryTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); - return delivered->getDeliveryTag(); -} - -AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ - return header; -} - -void IncomingMessage::getData(string& s){ - int count(content.size()); - for(int i = 0; i < count; i++){ - if(i == 0) s = content[i]->getData(); - else s += content[i]->getData(); - } -} - -u_int64_t IncomingMessage::contentSize(){ - u_int64_t size(0); - u_int64_t count(content.size()); - for(u_int64_t i = 0; i < count; i++){ - size += content[i]->size(); - } - return size; -} diff --git a/cpp/client/src/Message.cpp b/cpp/client/src/Message.cpp deleted file mode 100644 index 71befe57b1..0000000000 --- a/cpp/client/src/Message.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Message.h" - -using namespace qpid::client; -using namespace qpid::framing; - -Message::Message(){ - header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); -} - -Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ -} - -Message::~Message(){ -} - -BasicHeaderProperties* Message::getHeaderProperties(){ - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -std::string& Message::getContentType(){ - return getHeaderProperties()->getContentType(); -} - -std::string& Message::getContentEncoding(){ - return getHeaderProperties()->getContentEncoding(); -} - -FieldTable& Message::getHeaders(){ - return getHeaderProperties()->getHeaders(); -} - -u_int8_t Message::getDeliveryMode(){ - return getHeaderProperties()->getDeliveryMode(); -} - -u_int8_t Message::getPriority(){ - return getHeaderProperties()->getPriority(); -} - -std::string& Message::getCorrelationId(){ - return getHeaderProperties()->getCorrelationId(); -} - -std::string& Message::getReplyTo(){ - return getHeaderProperties()->getReplyTo(); -} - -std::string& Message::getExpiration(){ - return getHeaderProperties()->getExpiration(); -} - -std::string& Message::getMessageId(){ - return getHeaderProperties()->getMessageId(); -} - -u_int64_t Message::getTimestamp(){ - return getHeaderProperties()->getTimestamp(); -} - -std::string& Message::getType(){ - return getHeaderProperties()->getType(); -} - -std::string& Message::getUserId(){ - return getHeaderProperties()->getUserId(); -} - -std::string& Message::getAppId(){ - return getHeaderProperties()->getAppId(); -} - -std::string& Message::getClusterId(){ - return getHeaderProperties()->getClusterId(); -} - -void Message::setContentType(std::string& type){ - getHeaderProperties()->setContentType(type); -} - -void Message::setContentEncoding(std::string& encoding){ - getHeaderProperties()->setContentEncoding(encoding); -} - -void Message::setHeaders(FieldTable& headers){ - getHeaderProperties()->setHeaders(headers); -} - -void Message::setDeliveryMode(u_int8_t mode){ - getHeaderProperties()->setDeliveryMode(mode); -} - -void Message::setPriority(u_int8_t priority){ - getHeaderProperties()->setPriority(priority); -} - -void Message::setCorrelationId(std::string& correlationId){ - getHeaderProperties()->setCorrelationId(correlationId); -} - -void Message::setReplyTo(std::string& replyTo){ - getHeaderProperties()->setReplyTo(replyTo); -} - -void Message::setExpiration(std::string& expiration){ - getHeaderProperties()->setExpiration(expiration); -} - -void Message::setMessageId(std::string& messageId){ - getHeaderProperties()->setMessageId(messageId); -} - -void Message::setTimestamp(u_int64_t timestamp){ - getHeaderProperties()->setTimestamp(timestamp); -} - -void Message::setType(std::string& type){ - getHeaderProperties()->setType(type); -} - -void Message::setUserId(std::string& userId){ - getHeaderProperties()->setUserId(userId); -} - -void Message::setAppId(std::string& appId){ - getHeaderProperties()->setAppId(appId); -} - -void Message::setClusterId(std::string& clusterId){ - getHeaderProperties()->setClusterId(clusterId); -} diff --git a/cpp/client/src/MessageListener.cpp b/cpp/client/src/MessageListener.cpp deleted file mode 100644 index cf55db8edf..0000000000 --- a/cpp/client/src/MessageListener.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MessageListener.h" - -qpid::client::MessageListener::~MessageListener() {} diff --git a/cpp/client/src/Queue.cpp b/cpp/client/src/Queue.cpp deleted file mode 100644 index 5b2881f7ff..0000000000 --- a/cpp/client/src/Queue.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Queue.h" - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive) - : name(_name), autodelete(_autodelete), exclusive(_exclusive){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - - - - diff --git a/cpp/client/src/ResponseHandler.cpp b/cpp/client/src/ResponseHandler.cpp deleted file mode 100644 index 6938539469..0000000000 --- a/cpp/client/src/ResponseHandler.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "ResponseHandler.h" -#include "MonitorImpl.h" -#include "QpidError.h" - -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ - monitor = new qpid::concurrent::MonitorImpl(); -} - -qpid::client::ResponseHandler::~ResponseHandler(){ - delete monitor; -} - -bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ - return expected.match(response.get()); -} - -void qpid::client::ResponseHandler::waitForResponse(){ - monitor->acquire(); - if(waiting){ - monitor->wait(); - } - monitor->release(); -} - -void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ - response = _response; - monitor->acquire(); - waiting = false; - monitor->notify(); - monitor->release(); -} - -void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ - monitor->acquire(); - if(waiting){ - monitor->wait(); - } - monitor->release(); - if(!validate(expected)){ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); - } -} - -void qpid::client::ResponseHandler::expect(){ - waiting = true; -} diff --git a/cpp/client/src/ReturnedMessageHandler.cpp b/cpp/client/src/ReturnedMessageHandler.cpp deleted file mode 100644 index cfa91fee97..0000000000 --- a/cpp/client/src/ReturnedMessageHandler.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ReturnedMessageHandler.h" - -qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} |