diff options
Diffstat (limited to 'cpp/src/qpid/client')
21 files changed, 0 insertions, 2101 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp deleted file mode 100644 index ab4ea5b787..0000000000 --- a/cpp/src/qpid/client/Channel.cpp +++ /dev/null @@ -1,428 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/Channel.h> -#include <qpid/sys/Monitor.h> -#include <qpid/client/Message.h> -#include <qpid/QpidError.h> -#include <qpid/client/MethodBodyInstances.h> - -using namespace boost; //to use dynamic_pointer_cast -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - -Channel::Channel(bool _transactional, u_int16_t _prefetch) : - id(0), - con(0), - out(0), - incoming(0), - closed(true), - prefetch(_prefetch), - transactional(_transactional), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - version(8, 0) -{ } - -Channel::~Channel(){ - stop(); -} - -void Channel::setPrefetch(u_int16_t _prefetch){ - prefetch = _prefetch; - if(con != 0 && out != 0){ - setQos(); - } -} - -void Channel::setQos(){ -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); - if(transactional){ - sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok); - } -} - -void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); - FieldTable args; - AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_declare_ok); - }else{ - out->send(frame); - } -} - -void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_delete_ok); - }else{ - out->send(frame); - } -} - -void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); - FieldTable args; - AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false, - queue.isExclusive(), - queue.isAutoDelete(), !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_declare_ok); - if(queue.getName().length() == 0){ - QueueDeclareOkBody::shared_ptr response = - dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); - queue.setName(response->getQueue()); - } - }else{ - out->send(frame); - } -} - -void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_delete_ok); - }else{ - out->send(frame); - } -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_bind_ok); - }else{ - out->send(frame); - } -} - -void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, - int ackMode, bool noLocal, bool synch){ - - string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.basic_consume_ok); - BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); - tag = response->getConsumerTag(); - }else{ - out->send(frame); - } - Consumer* c = new Consumer(); - c->listener = listener; - c->ackMode = ackMode; - c->lastDeliveryTag = 0; - consumers[tag] = c; -} - -void Channel::cancel(std::string& tag, bool synch){ - Consumer* c = consumers[tag]; - if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true))); - } - - AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.basic_cancel_ok); - }else{ - out->send(frame); - } - consumers.erase(tag); - if(c != 0){ - delete c; - } -} - -void Channel::cancelAll(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ - Consumer* c = i->second; - if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); - } - consumers.erase(i); - delete c; - } -} - -void Channel::retrieve(Message& msg){ - Monitor::ScopedLock l(retrievalMonitor); - while(retrieved == 0){ - retrievalMonitor.wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - retrieved->getData(msg.data); - delete retrieved; - retrieved = 0; -} - -bool Channel::get(Message& msg, const Queue& queue, int ackMode){ - string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode)); - responses.expect(); - out->send(frame); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(method_bodies.basic_get_ok.match(response.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); - } - retrieve(msg); - return true; - }if(method_bodies.basic_get_empty.match(response.get())){ - return false; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); - } -} - - -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - string e = exchange.getName(); - string key = routingKey; - - out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); - //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); - AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(id, body)); - - u_int64_t data_length = data.length(); - if(data_length > 0){ - u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - out->send(new AMQFrame(id, new AMQContentBody(data))); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - out->send(new AMQFrame(id, new AMQContentBody(frag))); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void Channel::commit(){ - AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version)); - sendAndReceive(frame, method_bodies.tx_commit_ok); -} - -void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version)); - sendAndReceive(frame, method_bodies.tx_rollback_ok); -} - -void Channel::handleMethod(AMQMethodBody::shared_ptr body){ - //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(method_bodies.basic_deliver.match(body.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); - } - }else if(method_bodies.basic_return.match(body.get())){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); - } - }else if(method_bodies.channel_close.match(body.get())){ - con->removeChannel(this); - //need to signal application that channel has been closed through exception - - }else if(method_bodies.channel_flow.match(body.get())){ - - }else{ - //signal error - std::cout << "Unhandled method: " << *body << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); - } -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - dispatcher = Thread(this); -} - -void Channel::stop(){ - { - Monitor::ScopedLock l(dispatchMonitor); - closed = true; - dispatchMonitor.notify(); - } - dispatcher.join(); -} - -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - if(incoming->isResponse()){ - Monitor::ScopedLock l(retrievalMonitor); - retrieved = incoming; - retrievalMonitor.notify(); - }else{ - Monitor::ScopedLock l(dispatchMonitor); - messages.push(incoming); - dispatchMonitor.notify(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && !closed){ - dispatchMonitor.wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - -void Channel::deliver(Consumer* consumer, Message& msg){ - //record delivery tag: - consumer->lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer->listener->received(msg); - - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. - if(!closed){ - bool multiple(false); - switch(consumer->ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer->count) < prefetch) break; - //else drop-through - case AUTO_ACK: - out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); - consumer->lastDeliveryTag = 0; - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - -void Channel::dispatch(){ - while(!closed){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - incomingMsg->getData(msg.data); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); - } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers[tag] == 0){ - //signal error - std::cout << "Unknown consumer: " << tag << std::endl; - }else{ - deliver(consumers[tag], msg); - } - } - delete incomingMsg; - } - } -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - returnsHandler = handler; -} - -void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Channel::close(){ - if(con != 0){ - con->closeChannel(this); - } -} diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h deleted file mode 100644 index e850c1c626..0000000000 --- a/cpp/src/qpid/client/Channel.h +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <map> -#include <string> -#include <queue> -#include "sys/types.h" - -#ifndef _Channel_ -#define _Channel_ - -#include <qpid/framing/amqp_framing.h> -#include <qpid/client/Connection.h> -#include <qpid/client/Exchange.h> -#include <qpid/client/IncomingMessage.h> -#include <qpid/client/Message.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/Queue.h> -#include <qpid/client/ResponseHandler.h> -#include <qpid/client/ReturnedMessageHandler.h> - -namespace qpid { -namespace client { - enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3}; - - class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{ - struct Consumer{ - MessageListener* listener; - int ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string,Consumer*>::iterator consumer_iterator; - - u_int16_t id; - Connection* con; - qpid::sys::Thread dispatcher; - qpid::framing::OutputHandler* out; - IncomingMessage* incoming; - ResponseHandler responses; - std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume - IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor dispatchMonitor; - qpid::sys::Monitor retrievalMonitor; - std::map<std::string, Consumer*> consumers; - ReturnedMessageHandler* returnsHandler; - bool closed; - - u_int16_t prefetch; - const bool transactional; - qpid::framing::ProtocolVersion version; - - void enqueue(); - void retrieve(Message& msg); - IncomingMessage* dequeue(); - void dispatch(); - void stop(); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); - void deliver(Consumer* consumer, Message& msg); - void setQos(); - void cancelAll(); - - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - public: - Channel(bool transactional = false, u_int16_t prefetch = 500); - ~Channel(); - - void declareExchange(Exchange& exchange, bool synch = true); - void deleteExchange(Exchange& exchange, bool synch = true); - void declareQueue(Queue& queue, bool synch = true); - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const qpid::framing::FieldTable& args, bool synch = true); - void consume(Queue& queue, std::string& tag, MessageListener* listener, - int ackMode = NO_ACK, bool noLocal = false, bool synch = true); - void cancel(std::string& tag, bool synch = true); - bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); - void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - void commit(); - void rollback(); - - void setPrefetch(u_int16_t prefetch); - - /** - * Start message dispatching on a new thread - */ - void start(); - /** - * Do message dispatching on this thread - */ - void run(); - - void close(); - - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - friend class Connection; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp deleted file mode 100644 index 0b520d169d..0000000000 --- a/cpp/src/qpid/client/Connection.cpp +++ /dev/null @@ -1,244 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/Connection.h> -#include <qpid/client/Channel.h> -#include <qpid/client/Message.h> -#include <qpid/QpidError.h> -#include <iostream> -#include <qpid/client/MethodBodyInstances.h> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::sys; - -u_int16_t Connection::channelIdCounter; - -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - version(8, 0) -{ - connector = new Connector(debug, _max_frame_size); -} - -Connection::~Connection(){ - delete connector; -} - -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ - host = _host; - port = _port; - connector->setInputHandler(this); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); - connector->connect(host, port); - - ProtocolInitiation* header = new ProtocolInitiation(8, 0); - responses.expect(); - connector->init(header); - responses.receive(method_bodies.connection_start); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - responses.expect(); - out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - responses.receive(method_bodies.connection_tune); - - ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - - u_int16_t heartbeat = proposal->getHeartbeat(); - connector->setReadTimeout(heartbeat * 2); - connector->setWriteTimeout(heartbeat); - - //send connection.open - string capabilities; - string vhost = virtualhost; - responses.expect(); - out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). - responses.waitForResponse(); - if(responses.validate(method_bodies.connection_open_ok)){ - //ok - }else if(responses.validate(method_bodies.connection_redirect)){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() << std::endl; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } - -} - -void Connection::close(){ - if(!closed){ - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); - connector->close(); - } -} - -void Connection::openChannel(Channel* channel){ - channel->con = this; - channel->id = ++channelIdCounter; - channel->out = out; - channels[channel->id] = channel; - //now send frame to open channel and wait for response - string oob; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); - channel->setQos(); - channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ - //send frame to close channel - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - closeChannel(channel, code, text, classId, methodId); -} - -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ - //send frame to close channel - channel->cancelAll(); - channel->closed = true; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); - channel->con = 0; - channel->out = 0; - removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ - //send frame to close channel - - channels.erase(channel->id); - channel->out = 0; - channel->id = 0; - channel->con = 0; -} - -void Connection::received(AMQFrame* frame){ - u_int16_t channelId = frame->getChannel(); - - if(channelId == 0){ - this->handleBody(frame->getBody()); - }else{ - Channel* channel = channels[channelId]; - if(channel == 0){ - error(504, "Unknown channel"); - }else{ - try{ - channel->handleBody(frame->getBody()); - }catch(qpid::QpidError e){ - channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } - } - } -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ - //connection.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(method_bodies.connection_close.match(body.get())){ - //send back close ok - //close socket - ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); - std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; - connector->close(); - }else{ - std::cout << "Unhandled method for connection: " << *body << std::endl; - error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); - } -} - -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ - error(504, "Channel error: received header body with channel 0."); -} - -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ - error(504, "Channel error: received content body with channel 0."); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Connection::error(int code, const string& msg, int classid, int methodid){ - std::cout << "Connection exception generated: " << code << msg; - if(classid || methodid){ - std::cout << " [" << methodid << ":" << classid << "]"; - } - std::cout << std::endl; - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); - connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; - int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0){ - closeChannel(channel, code, msg); - }else{ - closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); - } -} - -void Connection::idleIn(){ - std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; - connector->close(); -} - -void Connection::idleOut(){ - out->send(new AMQFrame(0, new AMQHeartbeatBody())); -} - -void Connection::shutdown(){ - closed = true; - //close all channels - for(iterator i = channels.begin(); i != channels.end(); i++){ - i->second->stop(); - } -} diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h deleted file mode 100644 index c7b1fb8dd0..0000000000 --- a/cpp/src/qpid/client/Connection.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <map> -#include <string> - -#ifndef _Connection_ -#define _Connection_ - -#include <qpid/QpidError.h> -#include <qpid/client/Connector.h> -#include <qpid/sys/ShutdownHandler.h> -#include <qpid/sys/TimeoutHandler.h> - -#include <qpid/framing/amqp_framing.h> -#include <qpid/client/Exchange.h> -#include <qpid/client/IncomingMessage.h> -#include <qpid/client/Message.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/Queue.h> -#include <qpid/client/ResponseHandler.h> - -namespace qpid { -namespace client { - - class Channel; - -class Connection : public virtual qpid::framing::InputHandler, - public virtual qpid::sys::TimeoutHandler, - public virtual qpid::sys::ShutdownHandler, - private virtual qpid::framing::BodyHandler{ - - typedef std::map<int, Channel*>::iterator iterator; - - static u_int16_t channelIdCounter; - - std::string host; - int port; - const u_int32_t max_frame_size; - std::map<int, Channel*> channels; - Connector* connector; - qpid::framing::OutputHandler* out; - ResponseHandler responses; - volatile bool closed; - qpid::framing::ProtocolVersion version; - - void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); - void error(int code, const std::string& msg, int classid = 0, int methodid = 0); - void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); - - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - public: - - Connection(bool debug = false, u_int32_t max_frame_size = 65536); - ~Connection(); - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", const std::string& pwd = "guest", - const std::string& virtualhost = "/"); - void close(); - void openChannel(Channel* channel); - /* - * Requests that the server close this channel, then removes - * the association to the channel from this connection - */ - void closeChannel(Channel* channel); - /* - * Removes the channel from association with this connection, - * without sending a close request to the server. - */ - void removeChannel(Channel* channel); - - virtual void received(qpid::framing::AMQFrame* frame); - - virtual void idleOut(); - virtual void idleIn(); - - virtual void shutdown(); - - inline u_int32_t getMaxFrameSize(){ return max_frame_size; } - }; - - -} -} - - -#endif diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp deleted file mode 100644 index 116ea74193..0000000000 --- a/cpp/src/qpid/client/Connector.cpp +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <qpid/QpidError.h> -#include <qpid/sys/Time.h> -#include "Connector.h" - -using namespace qpid::sys; -using namespace qpid::client; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ } - -Connector::~Connector(){ } - -void Connector::connect(const std::string& host, int port){ - socket = Socket::createTcp(); - socket.connect(host, port); - closed = false; - receiver = Thread(this); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - closed = true; - socket.close(); - receiver.join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - size_t written = 0; - while(written < available && !closed){ - ssize_t sent = socket.send(data + written, available-written); - if(sent > 0) { - lastOut = now() * TIME_MSEC; - written += sent; - } - } -} - -void Connector::handleClosed(){ - closed = true; - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); -} - -void Connector::checkIdle(ssize_t status){ - if(timeoutHandler){ - Time t = now() * TIME_MSEC; - if(status == Socket::SOCKET_TIMEOUT) { - if(idleIn && (t - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(status == Socket::SOCKET_EOF){ - handleClosed(); - }else{ - lastIn = t; - } - if(idleOut && (t - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - socket.setTimeout(timeout*TIME_MSEC); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg - << " (" << error.location.file << ":" << error.location.line - << ")" << std::endl; - handleClosed(); - } -} diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h deleted file mode 100644 index 08705a4eb3..0000000000 --- a/cpp/src/qpid/client/Connector.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - - -#include <qpid/framing/InputHandler.h> -#include <qpid/framing/OutputHandler.h> -#include <qpid/framing/InitiationHandler.h> -#include <qpid/framing/ProtocolInitiation.h> -#include <qpid/sys/ShutdownHandler.h> -#include <qpid/sys/TimeoutHandler.h> -#include <qpid/sys/Thread.h> -#include <qpid/sys/Monitor.h> -#include <qpid/sys/Socket.h> - -namespace qpid { -namespace client { - - class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - qpid::sys::TimeoutHandler* timeoutHandler; - qpid::sys::ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; - - qpid::sys::Socket socket; - - void checkIdle(ssize_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - void handleClosed(); - - public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); - virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/Exchange.cpp b/cpp/src/qpid/client/Exchange.cpp deleted file mode 100644 index 5a4cfe45ad..0000000000 --- a/cpp/src/qpid/client/Exchange.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/Exchange.h> - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/qpid/client/Exchange.h b/cpp/src/qpid/client/Exchange.h deleted file mode 100644 index 8edc62713b..0000000000 --- a/cpp/src/qpid/client/Exchange.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _Exchange_ -#define _Exchange_ - -namespace qpid { -namespace client { - - class Exchange{ - const std::string name; - const std::string type; - - public: - - static const std::string DIRECT_EXCHANGE; - static const std::string TOPIC_EXCHANGE; - static const std::string HEADERS_EXCHANGE; - - static const Exchange DEFAULT_DIRECT_EXCHANGE; - static const Exchange DEFAULT_TOPIC_EXCHANGE; - static const Exchange DEFAULT_HEADERS_EXCHANGE; - - Exchange(std::string name, std::string type = DIRECT_EXCHANGE); - const std::string& getName() const; - const std::string& getType() const; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/IncomingMessage.cpp b/cpp/src/qpid/client/IncomingMessage.cpp deleted file mode 100644 index 6dabc16be4..0000000000 --- a/cpp/src/qpid/client/IncomingMessage.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/IncomingMessage.h> -#include <qpid/QpidError.h> -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; - -IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} -IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} -IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} - -IncomingMessage::~IncomingMessage(){ -} - -void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; -} - -void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){ - this->content.push_back(_content); -} - -bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == contentSize(); -} - -bool IncomingMessage::isReturn(){ - return returned; -} - -bool IncomingMessage::isDelivery(){ - return delivered; -} - -bool IncomingMessage::isResponse(){ - return response; -} - -const string& IncomingMessage::getConsumerTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); - return delivered->getConsumerTag(); -} - -u_int64_t IncomingMessage::getDeliveryTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); - return delivered->getDeliveryTag(); -} - -AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ - return header; -} - -void IncomingMessage::getData(string& s){ - int count(content.size()); - for(int i = 0; i < count; i++){ - if(i == 0) s = content[i]->getData(); - else s += content[i]->getData(); - } -} - -u_int64_t IncomingMessage::contentSize(){ - u_int64_t size(0); - u_int64_t count(content.size()); - for(u_int64_t i = 0; i < count; i++){ - size += content[i]->size(); - } - return size; -} diff --git a/cpp/src/qpid/client/IncomingMessage.h b/cpp/src/qpid/client/IncomingMessage.h deleted file mode 100644 index b72c78575e..0000000000 --- a/cpp/src/qpid/client/IncomingMessage.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <vector> -#include <qpid/framing/amqp_framing.h> - -#ifndef _IncomingMessage_ -#define _IncomingMessage_ - -#include <qpid/client/Message.h> - -namespace qpid { -namespace client { - - class IncomingMessage{ - //content will be preceded by one of these method frames - qpid::framing::BasicDeliverBody::shared_ptr delivered; - qpid::framing::BasicReturnBody::shared_ptr returned; - qpid::framing::BasicGetOkBody::shared_ptr response; - qpid::framing::AMQHeaderBody::shared_ptr header; - std::vector<qpid::framing::AMQContentBody::shared_ptr> content; - - u_int64_t contentSize(); - public: - IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro); - ~IncomingMessage(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr content); - bool isComplete(); - bool isReturn(); - bool isDelivery(); - bool isResponse(); - const std::string& getConsumerTag();//only relevant if isDelivery() - qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); - u_int64_t getDeliveryTag(); - void getData(std::string& data); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp deleted file mode 100644 index 3871d54e1c..0000000000 --- a/cpp/src/qpid/client/Message.cpp +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/Message.h> - -using namespace qpid::client; -using namespace qpid::framing; - -Message::Message(){ - header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); -} - -Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ -} - -Message::~Message(){ -} - -BasicHeaderProperties* Message::getHeaderProperties(){ - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -const std::string& Message::getContentType(){ - return getHeaderProperties()->getContentType(); -} - -const std::string& Message::getContentEncoding(){ - return getHeaderProperties()->getContentEncoding(); -} - -FieldTable& Message::getHeaders(){ - return getHeaderProperties()->getHeaders(); -} - -u_int8_t Message::getDeliveryMode(){ - return getHeaderProperties()->getDeliveryMode(); -} - -u_int8_t Message::getPriority(){ - return getHeaderProperties()->getPriority(); -} - -const std::string& Message::getCorrelationId(){ - return getHeaderProperties()->getCorrelationId(); -} - -const std::string& Message::getReplyTo(){ - return getHeaderProperties()->getReplyTo(); -} - -const std::string& Message::getExpiration(){ - return getHeaderProperties()->getExpiration(); -} - -const std::string& Message::getMessageId(){ - return getHeaderProperties()->getMessageId(); -} - -u_int64_t Message::getTimestamp(){ - return getHeaderProperties()->getTimestamp(); -} - -const std::string& Message::getType(){ - return getHeaderProperties()->getType(); -} - -const std::string& Message::getUserId(){ - return getHeaderProperties()->getUserId(); -} - -const std::string& Message::getAppId(){ - return getHeaderProperties()->getAppId(); -} - -const std::string& Message::getClusterId(){ - return getHeaderProperties()->getClusterId(); -} - -void Message::setContentType(const std::string& type){ - getHeaderProperties()->setContentType(type); -} - -void Message::setContentEncoding(const std::string& encoding){ - getHeaderProperties()->setContentEncoding(encoding); -} - -void Message::setHeaders(const FieldTable& headers){ - getHeaderProperties()->setHeaders(headers); -} - -void Message::setDeliveryMode(u_int8_t mode){ - getHeaderProperties()->setDeliveryMode(mode); -} - -void Message::setPriority(u_int8_t priority){ - getHeaderProperties()->setPriority(priority); -} - -void Message::setCorrelationId(const std::string& correlationId){ - getHeaderProperties()->setCorrelationId(correlationId); -} - -void Message::setReplyTo(const std::string& replyTo){ - getHeaderProperties()->setReplyTo(replyTo); -} - -void Message::setExpiration(const std::string& expiration){ - getHeaderProperties()->setExpiration(expiration); -} - -void Message::setMessageId(const std::string& messageId){ - getHeaderProperties()->setMessageId(messageId); -} - -void Message::setTimestamp(u_int64_t timestamp){ - getHeaderProperties()->setTimestamp(timestamp); -} - -void Message::setType(const std::string& type){ - getHeaderProperties()->setType(type); -} - -void Message::setUserId(const std::string& userId){ - getHeaderProperties()->setUserId(userId); -} - -void Message::setAppId(const std::string& appId){ - getHeaderProperties()->setAppId(appId); -} - -void Message::setClusterId(const std::string& clusterId){ - getHeaderProperties()->setClusterId(clusterId); -} diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h deleted file mode 100644 index 28e6adc608..0000000000 --- a/cpp/src/qpid/client/Message.h +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <qpid/framing/amqp_framing.h> - -#ifndef _Message_ -#define _Message_ - - -namespace qpid { -namespace client { - - class Message{ - qpid::framing::AMQHeaderBody::shared_ptr header; - std::string data; - bool redelivered; - u_int64_t deliveryTag; - - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - Message(qpid::framing::AMQHeaderBody::shared_ptr& header); - public: - Message(); - ~Message(); - - inline std::string getData(){ return data; } - inline void setData(const std::string& _data){ data = _data; } - - inline bool isRedelivered(){ return redelivered; } - inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - - inline u_int64_t getDeliveryTag(){ return deliveryTag; } - - const std::string& getContentType(); - const std::string& getContentEncoding(); - qpid::framing::FieldTable& getHeaders(); - u_int8_t getDeliveryMode(); - u_int8_t getPriority(); - const std::string& getCorrelationId(); - const std::string& getReplyTo(); - const std::string& getExpiration(); - const std::string& getMessageId(); - u_int64_t getTimestamp(); - const std::string& getType(); - const std::string& getUserId(); - const std::string& getAppId(); - const std::string& getClusterId(); - - void setContentType(const std::string& type); - void setContentEncoding(const std::string& encoding); - void setHeaders(const qpid::framing::FieldTable& headers); - void setDeliveryMode(u_int8_t mode); - void setPriority(u_int8_t priority); - void setCorrelationId(const std::string& correlationId); - void setReplyTo(const std::string& replyTo); - void setExpiration(const std::string& expiration); - void setMessageId(const std::string& messageId); - void setTimestamp(u_int64_t timestamp); - void setType(const std::string& type); - void setUserId(const std::string& userId); - void setAppId(const std::string& appId); - void setClusterId(const std::string& clusterId); - - - friend class Channel; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/MessageListener.cpp b/cpp/src/qpid/client/MessageListener.cpp deleted file mode 100644 index 664ac06849..0000000000 --- a/cpp/src/qpid/client/MessageListener.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/client/MessageListener.h> - -qpid::client::MessageListener::~MessageListener() {} diff --git a/cpp/src/qpid/client/MessageListener.h b/cpp/src/qpid/client/MessageListener.h deleted file mode 100644 index 69e3089cdb..0000000000 --- a/cpp/src/qpid/client/MessageListener.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _MessageListener_ -#define _MessageListener_ - -#include <qpid/client/Message.h> - -namespace qpid { -namespace client { - - class MessageListener{ - public: - virtual ~MessageListener(); - virtual void received(Message& msg) = 0; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h deleted file mode 100644 index f961ad183b..0000000000 --- a/cpp/src/qpid/client/MethodBodyInstances.h +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/framing/amqp_framing.h> - -/** - * This file replaces the auto-generated instances in the former - * amqp_methods.h file. Add additional instances as needed. - */ - -#ifndef _MethodBodyInstances_h_ -#define _MethodBodyInstances_h_ - -namespace qpid { -namespace client { - -class MethodBodyInstances -{ -private: - qpid::framing::ProtocolVersion version; -public: - const qpid::framing::BasicCancelOkBody basic_cancel_ok; - const qpid::framing::BasicConsumeOkBody basic_consume_ok; - const qpid::framing::BasicDeliverBody basic_deliver; - const qpid::framing::BasicGetEmptyBody basic_get_empty; - const qpid::framing::BasicGetOkBody basic_get_ok; - const qpid::framing::BasicQosOkBody basic_qos_ok; - const qpid::framing::BasicReturnBody basic_return; - const qpid::framing::ChannelCloseBody channel_close; - const qpid::framing::ChannelCloseOkBody channel_close_ok; - const qpid::framing::ChannelFlowBody channel_flow; - const qpid::framing::ChannelOpenOkBody channel_open_ok; - const qpid::framing::ConnectionCloseBody connection_close; - const qpid::framing::ConnectionCloseOkBody connection_close_ok; - const qpid::framing::ConnectionOpenOkBody connection_open_ok; - const qpid::framing::ConnectionRedirectBody connection_redirect; - const qpid::framing::ConnectionStartBody connection_start; - const qpid::framing::ConnectionTuneBody connection_tune; - const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; - const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; - const qpid::framing::QueueDeclareOkBody queue_declare_ok; - const qpid::framing::QueueDeleteOkBody queue_delete_ok; - const qpid::framing::QueueBindOkBody queue_bind_ok; - const qpid::framing::TxCommitOkBody tx_commit_ok; - const qpid::framing::TxRollbackOkBody tx_rollback_ok; - const qpid::framing::TxSelectOkBody tx_select_ok; - - MethodBodyInstances(u_int8_t major, u_int8_t minor) : - version(major, minor), - basic_cancel_ok(version), - basic_consume_ok(version), - basic_deliver(version), - basic_get_empty(version), - basic_get_ok(version), - basic_qos_ok(version), - basic_return(version), - channel_close(version), - channel_close_ok(version), - channel_flow(version), - channel_open_ok(version), - connection_close(version), - connection_close_ok(version), - connection_open_ok(version), - connection_redirect(version), - connection_start(version), - connection_tune(version), - exchange_declare_ok(version), - exchange_delete_ok(version), - queue_declare_ok(version), - queue_delete_ok(version), - queue_bind_ok(version), - tx_commit_ok(version), - tx_rollback_ok(version), - tx_select_ok(version) - {} - -}; - -static MethodBodyInstances method_bodies(8, 0); - -} // namespace client -} // namespace qpid - -#endif diff --git a/cpp/src/qpid/client/Queue.cpp b/cpp/src/qpid/client/Queue.cpp deleted file mode 100644 index d30cf8fc80..0000000000 --- a/cpp/src/qpid/client/Queue.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/Queue.h> - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive) - : name(_name), autodelete(_autodelete), exclusive(_exclusive){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - - - - diff --git a/cpp/src/qpid/client/Queue.h b/cpp/src/qpid/client/Queue.h deleted file mode 100644 index df7235e4ab..0000000000 --- a/cpp/src/qpid/client/Queue.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _Queue_ -#define _Queue_ - -namespace qpid { -namespace client { - - class Queue{ - std::string name; - const bool autodelete; - const bool exclusive; - - public: - - Queue(); - Queue(std::string name); - Queue(std::string name, bool temp); - Queue(std::string name, bool autodelete, bool exclusive); - const std::string& getName() const; - void setName(const std::string&); - bool isAutoDelete() const; - bool isExclusive() const; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp deleted file mode 100644 index 4e14d9938b..0000000000 --- a/cpp/src/qpid/client/ResponseHandler.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/ResponseHandler.h> -#include <qpid/sys/Monitor.h> -#include <qpid/QpidError.h> - -using namespace qpid::sys; - -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} - -qpid::client::ResponseHandler::~ResponseHandler(){} - -bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ - return expected.match(response.get()); -} - -void qpid::client::ResponseHandler::waitForResponse(){ - Monitor::ScopedLock l(monitor); - if(waiting){ - monitor.wait(); - } -} - -void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ - response = _response; - Monitor::ScopedLock l(monitor); - waiting = false; - monitor.notify(); -} - -void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ - Monitor::ScopedLock l(monitor); - if(waiting){ - monitor.wait(); - } - if(!validate(expected)){ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); - } -} - -void qpid::client::ResponseHandler::expect(){ - waiting = true; -} diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h deleted file mode 100644 index 7584ed6419..0000000000 --- a/cpp/src/qpid/client/ResponseHandler.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <qpid/framing/amqp_framing.h> -#include <qpid/sys/Monitor.h> - -#ifndef _ResponseHandler_ -#define _ResponseHandler_ - -namespace qpid { - namespace client { - - class ResponseHandler{ - bool waiting; - qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor monitor; - - public: - ResponseHandler(); - ~ResponseHandler(); - inline bool isWaiting(){ return waiting; } - inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; } - bool validate(const qpid::framing::AMQMethodBody& expected); - void waitForResponse(); - void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response); - void receive(const qpid::framing::AMQMethodBody& expected); - void expect();//must be called before calling receive - }; - - } -} - - -#endif diff --git a/cpp/src/qpid/client/ReturnedMessageHandler.cpp b/cpp/src/qpid/client/ReturnedMessageHandler.cpp deleted file mode 100644 index 099477c06b..0000000000 --- a/cpp/src/qpid/client/ReturnedMessageHandler.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/client/ReturnedMessageHandler.h> - -qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} diff --git a/cpp/src/qpid/client/ReturnedMessageHandler.h b/cpp/src/qpid/client/ReturnedMessageHandler.h deleted file mode 100644 index c1b5ee92f1..0000000000 --- a/cpp/src/qpid/client/ReturnedMessageHandler.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _ReturnedMessageHandler_ -#define _ReturnedMessageHandler_ - -#include <qpid/client/Message.h> - -namespace qpid { -namespace client { - - class ReturnedMessageHandler{ - public: - virtual ~ReturnedMessageHandler(); - virtual void returned(Message& msg) = 0; - }; - -} -} - - -#endif |