diff options
Diffstat (limited to 'cpp/src/client')
27 files changed, 3172 insertions, 0 deletions
diff --git a/cpp/src/client/AckMode.h b/cpp/src/client/AckMode.h new file mode 100644 index 0000000000..9ad5ef925c --- /dev/null +++ b/cpp/src/client/AckMode.h @@ -0,0 +1,102 @@ +#ifndef _client_AckMode_h +#define _client_AckMode_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * The available acknowledgements modes. + * + * \ingroup clientapi + */ +enum AckMode { + /** No acknowledgement will be sent, broker can + discard messages as soon as they are delivered + to a consumer using this mode. **/ + NO_ACK = 0, + /** Each message will be automatically + acknowledged as soon as it is delivered to the + application **/ + AUTO_ACK = 1, + /** Acknowledgements will be sent automatically, + but not for each message. **/ + LAZY_ACK = 2, + /** The application is responsible for explicitly + acknowledging messages. **/ + CLIENT_ACK = 3 +}; + +}} // namespace qpid::client + + + +#endif /*!_client_AckMode_h*/ +#ifndef _client_AckMode_h +#define _client_AckMode_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * The available acknowledgements modes. + * + * \ingroup clientapi + */ +enum AckMode { + /** No acknowledgement will be sent, broker can + discard messages as soon as they are delivered + to a consumer using this mode. **/ + NO_ACK = 0, + /** Each message will be automatically + acknowledged as soon as it is delivered to the + application **/ + AUTO_ACK = 1, + /** Acknowledgements will be sent automatically, + but not for each message. **/ + LAZY_ACK = 2, + /** The application is responsible for explicitly + acknowledging messages. **/ + CLIENT_ACK = 3 +}; + +}} // namespace qpid::client + + + +#endif /*!_client_AckMode_h*/ diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp new file mode 100644 index 0000000000..26c3fe543c --- /dev/null +++ b/cpp/src/client/BasicMessageChannel.cpp @@ -0,0 +1,395 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "BasicMessageChannel.h" +#include "../framing/AMQMethodBody.h" +#include "ClientChannel.h" +#include "ReturnedMessageHandler.h" +#include "MessageListener.h" +#include "../framing/FieldTable.h" +#include "Connection.h" +#include <queue> +#include <iostream> +#include <boost/format.hpp> +#include <boost/variant.hpp> + +namespace qpid { +namespace client { + +using namespace std; +using namespace sys; +using namespace framing; +using boost::format; + +namespace { + +// Destination name constants +const std::string BASIC_GET("__basic_get__"); +const std::string BASIC_RETURN("__basic_return__"); + +// Reference name constant +const std::string BASIC_REF("__basic_reference__"); +} + +class BasicMessageChannel::WaitableDestination : + public IncomingMessage::Destination +{ + public: + WaitableDestination() : shutdownFlag(false) {} + void message(const Message& msg) { + Mutex::ScopedLock l(monitor); + queue.push(msg); + monitor.notify(); + } + + void empty() { + Mutex::ScopedLock l(monitor); + queue.push(Empty()); + monitor.notify(); + } + + bool wait(Message& msgOut) { + Mutex::ScopedLock l(monitor); + while (queue.empty() && !shutdownFlag) + monitor.wait(); + if (shutdownFlag) + return false; + Message* msg = boost::get<Message>(&queue.front()); + bool success = msg; + if (success) + msgOut=*msg; + queue.pop(); + if (!queue.empty()) + monitor.notify(); // Wake another waiter. + return success; + } + + void shutdown() { + Mutex::ScopedLock l(monitor); + shutdownFlag = true; + monitor.notifyAll(); + } + + private: + struct Empty {}; + typedef boost::variant<Message,Empty> Item; + sys::Monitor monitor; + std::queue<Item> queue; + bool shutdownFlag; +}; + + +BasicMessageChannel::BasicMessageChannel(Channel& ch) + : channel(ch), returnsHandler(0), + destGet(new WaitableDestination()), + destDispatch(new WaitableDestination()) +{ + incoming.addDestination(BASIC_RETURN, *destDispatch); +} + +void BasicMessageChannel::consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) +{ + { + // Note we create a consumer even if tag="". In that case + // It will be renamed when we handle BasicConsumeOkBody. + // + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } + + // FIXME aconway 2007-03-23: get processed in both. + + // BasicConsumeOkBody is really processed in handle(), here + // we just pick up the tag to return to the user. + // + // We can't process it here because messages for the consumer may + // already be arriving. + // + BasicConsumeOkBody::shared_ptr ok = + channel.sendAndReceiveSync<BasicConsumeOkBody>( + synch, + new BasicConsumeBody( + channel.version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable())); + tag = ok->getConsumerTag(); +} + + +void BasicMessageChannel::cancel(const std::string& tag, bool synch) { + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + channel.sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(channel.version, tag, !synch)); +} + +void BasicMessageChannel::close(){ + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + destGet->shutdown(); + destDispatch->shutdown(); + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; + if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) + && c.lastDeliveryTag > 0) + { + channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } + } +} + + +bool BasicMessageChannel::get( + Message& msg, const Queue& queue, AckMode ackMode) +{ + // Prepare for incoming response + incoming.addDestination(BASIC_GET, *destGet); + channel.send( + new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); + bool got = destGet->wait(msg); + return got; +} + +void BasicMessageChannel::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + const string e = exchange.getName(); + string key = routingKey; + + // Make a header for the message + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + BasicHeaderProperties::copy( + *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); + header->setContentSize(msg.getData().size()); + + channel.send( + new BasicPublishBody( + channel.version, 0, e, key, mandatory, immediate)); + channel.send(header); + string data = msg.getData(); + u_int64_t data_length = data.length(); + if(data_length > 0){ + //frame itself uses 8 bytes + u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; + if(data_length < frag_size){ + channel.send(new AMQContentBody(data)); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(data.substr(offset, length)); + channel.send(new AMQContentBody(frag)); + + offset += length; + remaining = data_length - offset; + } + } + } +} + +void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { + assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); + switch(method->amqpMethodId()) { + case BasicGetOkBody::METHOD_ID: { + incoming.openReference(BASIC_REF); + incoming.createMessage(BASIC_GET, BASIC_REF); + return; + } + case BasicGetEmptyBody::METHOD_ID: { + incoming.getDestination(BASIC_GET).empty(); + incoming.removeDestination(BASIC_GET); + return; + } + case BasicDeliverBody::METHOD_ID: { + BasicDeliverBody::shared_ptr deliver= + boost::shared_polymorphic_downcast<BasicDeliverBody>(method); + incoming.openReference(BASIC_REF); + Message& msg = incoming.createMessage( + deliver->getConsumerTag(), BASIC_REF); + msg.setDestination(deliver->getConsumerTag()); + msg.setDeliveryTag(deliver->getDeliveryTag()); + msg.setRedelivered(deliver->getRedelivered()); + return; + } + case BasicReturnBody::METHOD_ID: { + incoming.openReference(BASIC_REF); + incoming.createMessage(BASIC_RETURN, BASIC_REF); + return; + } + case BasicConsumeOkBody::METHOD_ID: { + Mutex::ScopedLock l(lock); + BasicConsumeOkBody::shared_ptr consumeOk = + boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method); + std::string tag = consumeOk->getConsumerTag(); + ConsumerMap::iterator i = consumers.find(std::string()); + if (i != consumers.end()) { + // Need to rename the un-named consumer. + if (consumers.find(tag) == consumers.end()) { + consumers[tag] = i->second; + consumers.erase(i); + } + else // Tag already exists. + throw ChannelException(404, "Tag already exists: "+tag); + } + // FIXME aconway 2007-03-23: Integrate consumer & destination + // maps. + incoming.addDestination(tag, *destDispatch); + return; + } + } + throw Channel::UnknownMethod(); +} + +void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) { + BasicHeaderProperties* props = + boost::polymorphic_downcast<BasicHeaderProperties*>( + header->getProperties()); + IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF); + assert (ref.messages.size() == 1); + ref.messages.front().BasicHeaderProperties::operator=(*props); + incoming_size = header->getContentSize(); + if (incoming_size==0) + incoming.closeReference(BASIC_REF); +} + +void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){ + incoming.appendReference(BASIC_REF, content->getData()); + size_t size = incoming.getReference(BASIC_REF).data.size(); + if (size >= incoming_size) { + incoming.closeReference(BASIC_REF); + if (size > incoming_size) + throw ChannelException(502, "Content exceeded declared size"); + } +} + +void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ + //record delivery tag: + consumer.lastDeliveryTag = msg.getDeliveryTag(); + + //allow registered listener to handle the message + consumer.listener->received(msg); + + if(channel.isOpen()){ + bool multiple(false); + switch(consumer.ackMode){ + case LAZY_ACK: + multiple = true; + if(++(consumer.count) < channel.getPrefetch()) + break; + //else drop-through + case AUTO_ACK: + consumer.lastDeliveryTag = 0; + channel.send( + new BasicAckBody( + channel.version, + msg.getDeliveryTag(), + multiple)); + case NO_ACK: // Nothing to do + case CLIENT_ACK: // User code must ack. + break; + // TODO aconway 2007-02-22: Provide a way for user + // to ack! + } + } + + //as it stands, transactionality is entirely orthogonal to ack + //mode, though the acks will not be processed by the broker under + //a transaction until it commits. +} + + +void BasicMessageChannel::run() { + while(channel.isOpen()) { + try { + Message msg; + bool gotMessge = destDispatch->wait(msg); + if (gotMessge) { + if(msg.getDestination() == BASIC_RETURN) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; + } + if(handler != 0) + handler->returned(msg); + } + else { + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find( + msg.getDestination()); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + + msg.getDestination()); + consumer = i->second; + } + deliver(consumer, msg); + } + } + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::BasicMessageChannel::run() terminated by: " + << e.toString() << endl; + } + } +} + +void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); + returnsHandler = handler; +} + +void BasicMessageChannel::setQos(){ + channel.sendAndReceive<BasicQosOkBody>( + new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); + if(channel.isTransactional()) + channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); +} + +}} // namespace qpid::client diff --git a/cpp/src/client/BasicMessageChannel.h b/cpp/src/client/BasicMessageChannel.h new file mode 100644 index 0000000000..aaedfd6bf1 --- /dev/null +++ b/cpp/src/client/BasicMessageChannel.h @@ -0,0 +1,91 @@ +#ifndef _client_BasicMessageChannel_h +#define _client_BasicMessageChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "MessageChannel.h" +#include "IncomingMessage.h" +#include <boost/scoped_ptr.hpp> + +namespace qpid { +namespace client { +/** + * Messaging implementation using AMQP 0-8 BasicMessageChannel class + * to send and receiving messages. + */ +class BasicMessageChannel : public MessageChannel +{ + public: + BasicMessageChannel(Channel& parent); + + void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + void cancel(const std::string& tag, bool synch = true); + + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + void run(); + + void handle(boost::shared_ptr<framing::AMQMethodBody>); + + void handle(shared_ptr<framing::AMQHeaderBody>); + + void handle(shared_ptr<framing::AMQContentBody>); + + void setQos(); + + void close(); + + private: + + class WaitableDestination; + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + typedef std::map<std::string, Consumer> ConsumerMap; + + void deliver(Consumer& consumer, Message& msg); + + sys::Mutex lock; + Channel& channel; + IncomingMessage incoming; + uint64_t incoming_size; + ConsumerMap consumers ; + ReturnedMessageHandler* returnsHandler; + boost::scoped_ptr<WaitableDestination> destGet; + boost::scoped_ptr<WaitableDestination> destDispatch; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_BasicMessageChannel_h*/ diff --git a/cpp/src/client/ClientAdapter.cpp b/cpp/src/client/ClientAdapter.cpp new file mode 100644 index 0000000000..4bf91f915b --- /dev/null +++ b/cpp/src/client/ClientAdapter.cpp @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQP_ClientOperations.h" +#include "ClientAdapter.h" +#include "Connection.h" +#include "../Exception.h" +#include "../framing/AMQMethodBody.h" + +namespace qpid { +namespace client { + +using namespace qpid; +using namespace qpid::framing; + +typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + +void ClientAdapter::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) +{ + try{ + method->invoke(*clientOps, context); + }catch(ChannelException& e){ + connection.client->getChannel().close( + context, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + connection.closeChannel(getId()); + }catch(ConnectionException& e){ + connection.client->getConnection().close( + context, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.client->getConnection().close( + context, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + +void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { + channel->handleHeader(body); +} + +void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) { + channel->handleContent(body); +} + +void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { + // TODO aconway 2007-01-17: Implement heartbeats. +} + + + +}} // namespace qpid::client + diff --git a/cpp/src/client/ClientAdapter.h b/cpp/src/client/ClientAdapter.h new file mode 100644 index 0000000000..ca029a793f --- /dev/null +++ b/cpp/src/client/ClientAdapter.h @@ -0,0 +1,66 @@ +#ifndef _client_ClientAdapter_h +#define _client_ClientAdapter_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../framing/ChannelAdapter.h" +#include "ClientChannel.h" + +namespace qpid { +namespace client { + +class AMQMethodBody; +class Connection; + +/** + * Per-channel protocol adapter. + * + * Translates protocol bodies into calls on the core Channel, + * Connection and Client objects. + * + * Owns a channel, has references to Connection and Client. + */ +class ClientAdapter : public framing::ChannelAdapter +{ + public: + ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&); + Channel& getChannel() { return *channel; } + + void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); + void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); + void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); + + private: + void handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const framing::MethodContext& context); + + class ClientOps; + + std::auto_ptr<Channel> channel; + Connection& connection; + Client& client; + boost::shared_ptr<ClientOps> clientOps; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_ClientAdapter_h*/ diff --git a/cpp/src/client/ClientChannel.cpp b/cpp/src/client/ClientChannel.cpp new file mode 100644 index 0000000000..eda872fc30 --- /dev/null +++ b/cpp/src/client/ClientChannel.cpp @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "ClientChannel.h" +#include "../sys/Monitor.h" +#include "ClientMessage.h" +#include "../QpidError.h" +#include "MethodBodyInstances.h" +#include "Connection.h" +#include "BasicMessageChannel.h" +// FIXME aconway 2007-03-21: +//#include "MessageMessageChannel.h" + +// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent +// handling of errors that should close the connection or the channel. +// Make sure the user thread receives a connection in each case. +// +using namespace std; +using namespace boost; +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::sys; + +Channel::Channel(bool _transactional, u_int16_t _prefetch, + MessageChannel* impl) : + // FIXME aconway 2007-03-21: MessageMessageChannel + messaging(impl ? impl : new BasicMessageChannel(*this)), + connection(0), + prefetch(_prefetch), + transactional(_transactional) +{ } + +Channel::~Channel(){ + close(); +} + +void Channel::open(ChannelId id, Connection& con) +{ + if (isOpen()) + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); + connection = &con; + init(id, con, con.getVersion()); // ChannelAdapter initialization. + string oob; + if (id != 0) + sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); +} + +void Channel::protocolInit( + const std::string& uid, const std::string& pwd, const std::string& vhost) { + assert(connection); + responses.expect(); + connection->connector->init(); // Send ProtocolInit block. + ConnectionStartBody::shared_ptr connectionStart = + responses.receive<ConnectionStartBody>(); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + ConnectionTuneBody::shared_ptr proposal = + sendAndReceive<ConnectionTuneBody>( + new ConnectionStartOkBody( + version, connectionStart->getRequestId(), + props, mechanism, + response, locale)); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + send(new ConnectionTuneOkBody( + version, proposal->getRequestId(), + proposal->getChannelMax(), connection->getMaxFrameSize(), + proposal->getHeartbeat())); + + uint16_t heartbeat = proposal->getHeartbeat(); + connection->connector->setReadTimeout(heartbeat * 2); + connection->connector->setWriteTimeout(heartbeat); + + // Send connection open. + std::string capabilities; + responses.expect(); + send(new ConnectionOpenBody(version, vhost, capabilities, true)); + //receive connection.open-ok (or redirect, but ignore that for now + //esp. as using force=true). + AMQMethodBody::shared_ptr openResponse = responses.receive(); + if(openResponse->isA<ConnectionOpenOkBody>()) { + //ok + }else if(openResponse->isA<ConnectionRedirectBody>()){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect( + shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse)); + cout << "Received redirection to " << redirect->getHost() + << endl; + } else { + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open"); + } +} + +bool Channel::isOpen() const { return connection; } + +void Channel::setQos() { + messaging->setQos(); +} + +void Channel::setPrefetch(uint16_t _prefetch){ + prefetch = _prefetch; + setQos(); +} + +void Channel::declareExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + string type = exchange.getType(); + FieldTable args; + sendAndReceiveSync<ExchangeDeclareOkBody>( + synch, + new ExchangeDeclareBody( + version, 0, name, type, false, false, false, false, !synch, args)); +} + +void Channel::deleteExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + sendAndReceiveSync<ExchangeDeleteOkBody>( + synch, + new ExchangeDeleteBody(version, 0, name, false, !synch)); +} + +void Channel::declareQueue(Queue& queue, bool synch){ + string name = queue.getName(); + FieldTable args; + QueueDeclareOkBody::shared_ptr response = + sendAndReceiveSync<QueueDeclareOkBody>( + synch, + new QueueDeclareBody( + version, 0, name, false/*passive*/, queue.isDurable(), + queue.isExclusive(), queue.isAutoDelete(), !synch, args)); + if(synch) { + if(queue.getName().length() == 0) + queue.setName(response->getQueue()); + } +} + +void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ + //ticket, queue, ifunused, ifempty, nowait + string name = queue.getName(); + sendAndReceiveSync<QueueDeleteOkBody>( + synch, + new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + sendAndReceiveSync<QueueBindOkBody>( + synch, + new QueueBindBody(version, 0, q, e, key,!synch, args)); +} + +void Channel::commit(){ + sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); +} + +void Channel::rollback(){ + sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version)); +} + +void Channel::handleMethodInContext( + AMQMethodBody::shared_ptr method, const MethodContext&) +{ + // TODO aconway 2007-03-23: Special case for consume OK as it + // is both an expected response and needs handling in this thread. + // Need to review & reationalize the client-side processing model. + if (method->isA<BasicConsumeOkBody>()) { + messaging->handle(method); + responses.signalResponse(method); + return; + } + if(responses.isWaiting()) { + responses.signalResponse(method); + return; + } + try { + switch (method->amqpClassId()) { + case BasicDeliverBody::CLASS_ID: messaging->handle(method); break; + case ChannelCloseBody::CLASS_ID: handleChannel(method); break; + case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; + default: throw UnknownMethod(); + } + } + catch (const UnknownMethod&) { + connection->close( + 504, "Unknown method", + method->amqpClassId(), method->amqpMethodId()); + } + } + +void Channel::handleChannel(AMQMethodBody::shared_ptr method) { + switch (method->amqpMethodId()) { + case ChannelCloseBody::METHOD_ID: + peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); + return; + case ChannelFlowBody::METHOD_ID: + // FIXME aconway 2007-02-22: Not yet implemented. + return; + } + throw UnknownMethod(); +} + +void Channel::handleConnection(AMQMethodBody::shared_ptr method) { + if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { + connection->close(); + return; + } + throw UnknownMethod(); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ + messaging->handle(body); +} + +void Channel::handleContent(AMQContentBody::shared_ptr body){ + messaging->handle(body); +} + +void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); +} + +void Channel::start(){ + dispatcher = Thread(*messaging); +} + +// Close called by local application. +void Channel::close( + uint16_t code, const std::string& text, + ClassId classId, MethodId methodId) +{ + if (isOpen()) { + try { + if (getId() != 0) { + sendAndReceive<ChannelCloseOkBody>( + new ChannelCloseBody( + version, code, text, classId, methodId)); + } + static_cast<ConnectionForChannel*>(connection)->erase(getId()); + closeInternal(); + } catch (...) { + static_cast<ConnectionForChannel*>(connection)->erase(getId()); + closeInternal(); + throw; + } + } +} + +// Channel closed by peer. +void Channel::peerClose(ChannelCloseBody::shared_ptr) { + assert(isOpen()); + closeInternal(); +} + +void Channel::closeInternal() { + if (isOpen()); + { + messaging->close(); + connection = 0; + // A 0 response means we are closed. + responses.signalResponse(AMQMethodBody::shared_ptr()); + } + dispatcher.join(); +} + +AMQMethodBody::shared_ptr Channel::sendAndReceive( + AMQMethodBody* toSend, ClassId c, MethodId m) +{ + responses.expect(); + send(toSend); + return responses.receive(c, m); +} + +AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( + bool sync, AMQMethodBody* body, ClassId c, MethodId m) +{ + if(sync) + return sendAndReceive(body, c, m); + else { + send(body); + return AMQMethodBody::shared_ptr(); + } +} + +void Channel::consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields); +} + +void Channel::cancel(const std::string& tag, bool synch) { + messaging->cancel(tag, synch); +} + +bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { + return messaging->get(msg, queue, ackMode); +} + +void Channel::publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory, bool immediate) { + messaging->publish(msg, exchange, routingKey, mandatory, immediate); +} + +void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) { + messaging->setReturnedMessageHandler(handler); +} + +void Channel::run() { + messaging->run(); +} + diff --git a/cpp/src/client/ClientChannel.h b/cpp/src/client/ClientChannel.h new file mode 100644 index 0000000000..a7e0d2ec31 --- /dev/null +++ b/cpp/src/client/ClientChannel.h @@ -0,0 +1,352 @@ +#ifndef _client_ClientChannel_h +#define _client_ClientChannel_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/scoped_ptr.hpp> +#include "../framing/amqp_framing.h" +#include "ClientExchange.h" +#include "ClientMessage.h" +#include "ClientQueue.h" +#include "ResponseHandler.h" +#include "../framing/ChannelAdapter.h" +#include "../sys/Thread.h" +#include "AckMode.h" + +namespace qpid { + +namespace framing { +class ChannelCloseBody; +class AMQMethodBody; +} + +namespace client { + +class Connection; +class MessageChannel; +class MessageListener; +class ReturnedMessageHandler; + +/** + * Represents an AMQP channel, i.e. loosely a session of work. It + * is through a channel that most of the AMQP 'methods' are + * exposed. + * + * \ingroup clientapi + */ +class Channel : public framing::ChannelAdapter +{ + private: + struct UnknownMethod {}; + + sys::Mutex lock; + boost::scoped_ptr<MessageChannel> messaging; + Connection* connection; + sys::Thread dispatcher; + ResponseHandler responses; + + uint16_t prefetch; + const bool transactional; + framing::ProtocolVersion version; + + void handleHeader(framing::AMQHeaderBody::shared_ptr body); + void handleContent(framing::AMQContentBody::shared_ptr body); + void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); + void handleMethodInContext( + framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); + void handleChannel(framing::AMQMethodBody::shared_ptr method); + void handleConnection(framing::AMQMethodBody::shared_ptr method); + + void setQos(); + + void protocolInit( + const std::string& uid, const std::string& pwd, + const std::string& vhost); + + framing::AMQMethodBody::shared_ptr sendAndReceive( + framing::AMQMethodBody*, framing::ClassId, framing::MethodId); + + framing::AMQMethodBody::shared_ptr sendAndReceiveSync( + bool sync, + framing::AMQMethodBody*, framing::ClassId, framing::MethodId); + + template <class BodyType> + boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { + return boost::shared_polymorphic_downcast<BodyType>( + sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID)); + } + + template <class BodyType> + boost::shared_ptr<BodyType> sendAndReceiveSync( + bool sync, framing::AMQMethodBody* body) { + return boost::shared_polymorphic_downcast<BodyType>( + sendAndReceiveSync( + sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); + } + + void open(framing::ChannelId, Connection&); + void closeInternal(); + void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); + + // FIXME aconway 2007-02-23: Get rid of friendships. + friend class Connection; + friend class BasicMessageChannel; // for sendAndReceive. + friend class MessageMessageChannel; // for sendAndReceive. + + public: + + /** + * Creates a channel object. + * + * @param transactional if true, the publishing and acknowledgement + * of messages will be transactional and can be committed or + * aborted in atomic units (@see commit(), @see rollback()) + * + * @param prefetch specifies the number of unacknowledged + * messages the channel is willing to have sent to it + * asynchronously + * + * @param messageImpl Alternate messaging implementation class to + * allow alternate protocol implementations of messaging + * operations. Takes ownership. + */ + Channel( + bool transactional = false, u_int16_t prefetch = 500, + MessageChannel* messageImpl = 0); + + ~Channel(); + + /** + * Declares an exchange. + * + * In AMQP Exchanges are the destinations to which messages + * are published. They have Queues bound to them and route + * messages they receive to those queues. The routing rules + * depend on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareExchange(Exchange& exchange, bool synch = true); + /** + * Deletes an exchange + * + * @param exchange an Exchange object representing the exchange to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteExchange(Exchange& exchange, bool synch = true); + /** + * Declares a Queue + * + * @param queue a Queue object representing the queue to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareQueue(Queue& queue, bool synch = true); + /** + * Deletes a Queue + * + * @param queue a Queue object representing the queue to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); + /** + * Binds a queue to an exchange. The exact semantics of this + * (in particular how 'routing keys' and 'binding arguments' + * are used) depends on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to bind to + * + * @param queue a Queue object representing the queue to be + * bound + * + * @param key the 'routing key' for the binding + * + * @param args the 'binding arguments' for the binding + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void bind(const Exchange& exchange, const Queue& queue, + const std::string& key, const framing::FieldTable& args, + bool synch = true); + + /** + * For a transactional channel this will commit all + * publications and acknowledgements since the last commit (or + * the channel was opened if there has been no previous + * commit). This will cause published messages to become + * available to consumers and acknowledged messages to be + * consumed and removed from the queues they were dispatched + * from. + * + * Transactionailty of a channel is specified when the channel + * object is created (@see Channel()). + */ + void commit(); + + /** + * For a transactional channel, this will rollback any + * publications or acknowledgements. It will be as if the + * ppblished messages were never sent and the acknowledged + * messages were never consumed. + */ + void rollback(); + + /** + * Change the prefetch in use. + */ + void setPrefetch(uint16_t prefetch); + + uint16_t getPrefetch() { return prefetch; } + + /** + * Start message dispatching on a new thread + */ + void start(); + + /** + * Close the channel with optional error information. + * Closing a channel that is not open has no effect. + */ + void close( + framing::ReplyCode = 200, const std::string& ="OK", + framing::ClassId = 0, framing::MethodId = 0); + + /** True if the channel is transactional */ + bool isTransactional() { return transactional; } + + /** True if the channel is open */ + bool isOpen() const; + + /** Get the connection associated with this channel */ + Connection& getConnection() { return *connection; } + + /** Return the protocol version */ + framing::ProtocolVersion getVersion() const { return version ; } + + /** + * Creates a 'consumer' for a queue. Messages in (or arriving + * at) that queue will be delivered to consumers + * asynchronously. + * + * @param queue a Queue instance representing the queue to + * consume from + * + * @param tag an identifier to associate with the consumer + * that can be used to cancel its subscription (if empty, this + * will be assigned by the broker) + * + * @param listener a pointer to an instance of an + * implementation of the MessageListener interface. Messages + * received from this queue for this consumer will result in + * invocation of the received() method on the listener, with + * the message itself passed in. + * + * @param ackMode the mode of acknowledgement that the broker + * should assume for this consumer. @see AckMode + * + * @param noLocal if true, this consumer will not be sent any + * message published by this connection + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + /** + * Cancels a subscription previously set up through a call to consume(). + * + * @param tag the identifier used (or assigned) in the consume + * request that set up the subscription to be cancelled. + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void cancel(const std::string& tag, bool synch = true); + /** + * Synchronous pull of a message from a queue. + * + * @param msg a message object that will contain the message + * headers and content if the call completes. + * + * @param queue the queue to consume from + * + * @param ackMode the acknowledgement mode to use (@see + * AckMode) + * + * @return true if a message was succcessfully dequeued from + * the queue, false if the queue was empty. + */ + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + /** + * Publishes (i.e. sends a message to the broker). + * + * @param msg the message to publish + * + * @param exchange the exchange to publish the message to + * + * @param routingKey the routing key to publish with + * + * @param mandatory if true and the exchange to which this + * publish is directed has no matching bindings, the message + * will be returned (see setReturnedMessageHandler()). + * + * @param immediate if true and there is no consumer to + * receive this message on publication, the message will be + * returned (see setReturnedMessageHandler()). + */ + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + /** + * Set a handler for this channel that will process any + * returned messages + * + * @see publish() + */ + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + /** + * Deliver messages from the broker to the appropriate MessageListener. + */ + void run(); + + +}; + +}} + +#endif /*!_client_ClientChannel_h*/ diff --git a/cpp/src/client/ClientExchange.cpp b/cpp/src/client/ClientExchange.cpp new file mode 100644 index 0000000000..d5914beea2 --- /dev/null +++ b/cpp/src/client/ClientExchange.cpp @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ClientExchange.h" + +qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} +const std::string& qpid::client::Exchange::getName() const { return name; } +const std::string& qpid::client::Exchange::getType() const { return type; } + +const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; +const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; +const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; + +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/client/ClientExchange.h b/cpp/src/client/ClientExchange.h new file mode 100644 index 0000000000..a8ac21fa9b --- /dev/null +++ b/cpp/src/client/ClientExchange.h @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +#ifndef _Exchange_ +#define _Exchange_ + +namespace qpid { +namespace client { + + /** + * A 'handle' used to represent an AMQP exchange in the Channel + * methods. Exchanges are the destinations to which messages are + * published. + * + * There are different types of exchange (the standard types are + * available as static constants, see DIRECT_EXCHANGE, + * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to + * an exchange using Channel::bind() and messages published to + * that exchange are then routed to the queue based on the details + * of the binding and the type of exchange. + * + * There are some standard exchange instances that are predeclared + * on all AMQP brokers. These are defined as static members + * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and + * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange + * (member DEFAULT_EXCHANGE) which is nameless and of type + * 'direct' and has every declared queue bound to it by queue + * name. + * + * \ingroup clientapi + */ + class Exchange{ + const std::string name; + const std::string type; + + public: + /** + * A direct exchange routes messages published with routing + * key X to any queue bound with key X (i.e. an exact match is + * used). + */ + static const std::string DIRECT_EXCHANGE; + /** + * A topic exchange treat the key with which a queue is bound + * as a pattern and routes all messages whose routing keys + * match that pattern to the bound queue. The routing key for + * a message must consist of zero or more alpha-numeric words + * delimited by dots. The pattern is of a similar form but * + * can be used to match excatly one word and # can be used to + * match zero or more words. + */ + static const std::string TOPIC_EXCHANGE; + /** + * The headers exchange routes messages based on whether their + * headers match the binding arguments specified when + * binding. (see the AMQP spec for more details). + */ + static const std::string HEADERS_EXCHANGE; + + /** + * The 'default' exchange, nameless and of type 'direct'. Has + * every declared queue bound to it by name. + */ + static const Exchange DEFAULT_EXCHANGE; + /** + * The standard direct exchange, named amq.direct. + */ + static const Exchange STANDARD_DIRECT_EXCHANGE; + /** + * The standard topic exchange, named amq.topic. + */ + static const Exchange STANDARD_TOPIC_EXCHANGE; + /** + * The standard headers exchange, named amq.header. + */ + static const Exchange STANDARD_HEADERS_EXCHANGE; + + Exchange(std::string name, std::string type = DIRECT_EXCHANGE); + const std::string& getName() const; + const std::string& getType() const; + }; + +} +} + + +#endif diff --git a/cpp/src/client/ClientMessage.h b/cpp/src/client/ClientMessage.h new file mode 100644 index 0000000000..dc25b4070b --- /dev/null +++ b/cpp/src/client/ClientMessage.h @@ -0,0 +1,62 @@ +#ifndef _client_ClientMessage_h +#define _client_ClientMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "../framing/BasicHeaderProperties.h" + +namespace qpid { +namespace client { + +/** + * A representation of messages for sent or recived through the + * client api. + * + * \ingroup clientapi + */ +class Message : public framing::BasicHeaderProperties { + public: + Message(const std::string& data_=std::string()) : data(data_) {} + + std::string getData() const { return data; } + void setData(const std::string& _data) { data = _data; } + + std::string getDestination() const { return destination; } + void setDestination(const std::string& dest) { destination = dest; } + + // TODO aconway 2007-03-22: only needed for Basic.deliver support. + uint64_t getDeliveryTag() const { return deliveryTag; } + void setDeliveryTag(uint64_t dt) { deliveryTag = dt; } + + bool isRedelivered() const { return redelivered; } + void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + + private: + std::string data; + std::string destination; + bool redelivered; + uint64_t deliveryTag; +}; + +}} + +#endif /*!_client_ClientMessage_h*/ diff --git a/cpp/src/client/ClientQueue.cpp b/cpp/src/client/ClientQueue.cpp new file mode 100644 index 0000000000..613cf8d288 --- /dev/null +++ b/cpp/src/client/ClientQueue.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ClientQueue.h" + +qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} + +qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} + +qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} + +qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) + : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} + +const std::string& qpid::client::Queue::getName() const{ + return name; +} + +void qpid::client::Queue::setName(const std::string& _name){ + name = _name; +} + +bool qpid::client::Queue::isAutoDelete() const{ + return autodelete; +} + +bool qpid::client::Queue::isExclusive() const{ + return exclusive; +} + +bool qpid::client::Queue::isDurable() const{ + return durable; +} + +void qpid::client::Queue::setDurable(bool _durable){ + durable = _durable; +} + + + + diff --git a/cpp/src/client/ClientQueue.h b/cpp/src/client/ClientQueue.h new file mode 100644 index 0000000000..b37a44b004 --- /dev/null +++ b/cpp/src/client/ClientQueue.h @@ -0,0 +1,103 @@ +#ifndef _client_ClientQueue_h +#define _client_ClientQueue_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +namespace qpid { +namespace client { + + /** + * A 'handle' used to represent an AMQP queue in the Channel + * methods. Creating an instance of this class does not cause the + * queue to be created on the broker. Rather, an instance of this + * class should be passed to Channel::declareQueue() to ensure + * that the queue exists or is created. + * + * Queues hold messages and allow clients to consume + * (see Channel::consume()) or get (see Channel::get()) those messags. A + * queue receives messages by being bound to one or more Exchange; + * messages published to that exchange may then be routed to the + * queue based on the details of the binding and the type of the + * exchange (see Channel::bind()). + * + * Queues are identified by a name. They can be exclusive (in which + * case they can only be used in the context of the connection + * over which they were declared, and are deleted when then + * connection closes), or they can be shared. Shared queues can be + * auto deleted when they have no consumers. + * + * We use the term 'temporary queue' to refer to an exclusive + * queue. + * + * \ingroup clientapi + */ + class Queue{ + std::string name; + const bool autodelete; + const bool exclusive; + bool durable; + + public: + + /** + * Creates an unnamed, non-durable, temporary queue. A name + * will be assigned to this queue instance by a call to + * Channel::declareQueue(). + */ + Queue(); + /** + * Creates a shared, non-durable, queue with a given name, + * that will not be autodeleted. + * + * @param name the name of the queue + */ + Queue(std::string name); + /** + * Creates a non-durable queue with a given name. + * + * @param name the name of the queue + * + * @param temp if true the queue will be a temporary queue, if + * false it will be shared and not autodeleted. + */ + Queue(std::string name, bool temp); + /** + * This constructor allows the autodelete, exclusive and + * durable propeties to be explictly set. Note however that if + * exclusive is true, autodelete has no meaning as exclusive + * queues are always destroyed when the connection that + * created them is closed. + */ + Queue(std::string name, bool autodelete, bool exclusive, bool durable); + const std::string& getName() const; + void setName(const std::string&); + bool isAutoDelete() const; + bool isExclusive() const; + bool isDurable() const; + void setDurable(bool durable); + }; + +} +} + +#endif /*!_client_ClientQueue_h*/ diff --git a/cpp/src/client/Connection.cpp b/cpp/src/client/Connection.cpp new file mode 100644 index 0000000000..365311ab37 --- /dev/null +++ b/cpp/src/client/Connection.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <algorithm> +#include <boost/format.hpp> +#include <boost/bind.hpp> + +#include "Connection.h" +#include "ClientChannel.h" +#include "ClientMessage.h" +#include "../QpidError.h" +#include <iostream> +#include <sstream> +#include "MethodBodyInstances.h" +#include <functional> + +using namespace qpid::framing; +using namespace qpid::sys; + + +namespace qpid { +namespace client { + +const std::string Connection::OK("OK"); + +Connection::Connection( + bool _debug, uint32_t _max_frame_size, + framing::ProtocolVersion _version +) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), + defaultConnector(version, _debug, _max_frame_size), + isOpen(false), debug(_debug) +{ + setConnector(defaultConnector); +} + +Connection::~Connection(){} + +void Connection::setConnector(Connector& con) +{ + connector = &con; + connector->setInputHandler(this); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); + out = connector->getOutputHandler(); +} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + connector->connect(host, port); + channels[0] = &channel0; + channel0.open(0, *this); + channel0.protocolInit(uid, pwd, vhost); + isOpen = true; +} + +void Connection::shutdown() { + close(); +} + +void Connection::close( + ReplyCode code, const string& msg, ClassId classId, MethodId methodId +) +{ + if(isOpen) { + // TODO aconway 2007-01-29: Exception handling - could end up + // partly closed with threads left unjoined. + isOpen = false; + channel0.sendAndReceive<ConnectionCloseOkBody>( + new ConnectionCloseBody( + getVersion(), code, msg, classId, methodId)); + + using boost::bind; + for_each(channels.begin(), channels.end(), + bind(&Channel::closeInternal, + bind(&ChannelMap::value_type::second, _1))); + channels.clear(); + connector->close(); + } +} + +void Connection::openChannel(Channel& channel) { + ChannelId id = ++channelIdCounter; + assert (channels.find(id) == channels.end()); + assert(out); + channels[id] = &channel; + channel.open(id, *this); +} + +void Connection::erase(ChannelId id) { + channels.erase(id); +} + +void Connection::received(AMQFrame* frame){ + // FIXME aconway 2007-01-25: Mutex + ChannelId id = frame->getChannel(); + Channel* channel = channels[id]; + // FIXME aconway 2007-01-26: Exception thrown here is hanging the + // client. Need to review use of exceptions. + if (channel == 0) + THROW_QPID_ERROR( + PROTOCOL_ERROR+504, + (boost::format("Invalid channel number %g") % id).str()); + try{ + channel->handleBody(frame->getBody()); + }catch(const qpid::QpidError& e){ + channelException( + *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + } +} + +void Connection::send(AMQFrame* frame) { + out->send(frame); +} + +void Connection::channelException( + Channel& channel, AMQMethodBody* method, const QpidError& e) +{ + int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; + string msg = e.msg; + if(method == 0) + channel.close(code, msg); + else + channel.close( + code, msg, method->amqpClassId(), method->amqpMethodId()); +} + +void Connection::idleIn(){ + connector->close(); +} + +void Connection::idleOut(){ + out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); +} + +}} // namespace qpid::client diff --git a/cpp/src/client/Connection.h b/cpp/src/client/Connection.h new file mode 100644 index 0000000000..5e0b413dac --- /dev/null +++ b/cpp/src/client/Connection.h @@ -0,0 +1,179 @@ +#ifndef _client_Connection_ +#define _client_Connection_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <map> +#include <string> +#include "../QpidError.h" +#include "ClientChannel.h" +#include "Connector.h" +#include "../sys/ShutdownHandler.h" +#include "../sys/TimeoutHandler.h" + + +namespace qpid { + +/** + * The client namespace contains all classes that make up a client + * implementation of the AMQP protocol. The key classes that form + * the basis of the client API to be used by applications are + * Connection and Channel. + */ +namespace client { + +/** + * \internal provide access to selected private channel functions + * for the Connection without making it a friend of the entire channel. + */ +class ConnectionForChannel : + public framing::InputHandler, + public framing::OutputHandler, + public sys::TimeoutHandler, + public sys::ShutdownHandler + +{ + private: + friend class Channel; + virtual void erase(framing::ChannelId) = 0; +}; + + +/** + * \defgroup clientapi Application API for an AMQP client + */ + +/** + * Represents a connection to an AMQP broker. All communication is + * initiated by establishing a connection, then opening one or + * more Channels over that connection. + * + * \ingroup clientapi + */ +class Connection : public ConnectionForChannel +{ + typedef std::map<framing::ChannelId, Channel*> ChannelMap; + + framing::ChannelId channelIdCounter; + static const std::string OK; + + framing::ProtocolVersion version; + const uint32_t max_frame_size; + ChannelMap channels; + Connector defaultConnector; + Connector* connector; + framing::OutputHandler* out; + volatile bool isOpen; + Channel channel0; + bool debug; + + void erase(framing::ChannelId); + void channelException( + Channel&, framing::AMQMethodBody*, const QpidError&); + + // TODO aconway 2007-01-26: too many friendships, untagle these classes. + friend class Channel; + + public: + /** + * Creates a connection object, but does not open the + * connection. + * + * @param _version the version of the protocol to connect with + * + * @param debug turns on tracing for the connection + * (i.e. prints details of the frames sent and received to std + * out). Optional and defaults to false. + * + * @param max_frame_size the maximum frame size that the + * client will accept. Optional and defaults to 65536. + */ + Connection(bool debug = false, uint32_t max_frame_size = 65536, + framing::ProtocolVersion=framing::highestProtocolVersion); + ~Connection(); + + /** + * Opens a connection to a broker. + * + * @param host the host on which the broker is running + * + * @param port the port on the which the broker is listening + * + * @param uid the userid to connect with + * + * @param pwd the password to connect with (currently SASL + * PLAIN is the only authentication method supported so this + * is sent in clear text) + * + * @param virtualhost the AMQP virtual host to use (virtual + * hosts, where implemented(!), provide namespace partitioning + * within a single broker). + */ + void open(const std::string& host, int port = 5672, + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& virtualhost = "/"); + + /** + * Close the connection with optional error information for the peer. + * + * Any further use of this connection (without reopening it) will + * not succeed. + */ + void close(framing::ReplyCode=200, const std::string& msg=OK, + framing::ClassId = 0, framing::MethodId = 0); + + /** + * Associate a Channel with this connection and open it for use. + * + * In AMQP channels are like multi-plexed 'sessions' of work over + * a connection. Almost all the interaction with AMQP is done over + * a channel. + * + * @param connection the connection object to be associated with + * the channel. Call Channel::close() to close the channel. + */ + void openChannel(Channel&); + + + // TODO aconway 2007-01-26: can these be private? + void send(framing::AMQFrame*); + void received(framing::AMQFrame*); + void idleOut(); + void idleIn(); + void shutdown(); + + /**\internal used for testing */ + void setConnector(Connector& connector); + + /** + * @return the maximum frame size in use on this connection + */ + inline uint32_t getMaxFrameSize(){ return max_frame_size; } + + /** @return protocol version in use on this connection. */ + framing::ProtocolVersion getVersion() const { return version; } +}; + +}} // namespace qpid::client + + +#endif diff --git a/cpp/src/client/Connector.cpp b/cpp/src/client/Connector.cpp new file mode 100644 index 0000000000..566e58ec13 --- /dev/null +++ b/cpp/src/client/Connector.cpp @@ -0,0 +1,188 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "../QpidError.h" +#include "../sys/Time.h" +#include "Connector.h" + +namespace qpid { +namespace client { + +using namespace qpid::sys; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector( + ProtocolVersion ver, bool _debug, uint32_t buffer_size +) : debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + version(ver), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size) +{ } + +Connector::~Connector(){ } + +void Connector::connect(const std::string& host, int port){ + socket = Socket::createTcp(); + socket.connect(host, port); + closed = false; + receiver = Thread(this); +} + +void Connector::init(){ + ProtocolInitiation init(version); + writeBlock(&init); +} + +void Connector::close(){ + closed = true; + socket.close(); + receiver.join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* f){ + std::auto_ptr<AMQFrame> frame(f); + AMQBody::shared_ptr body = frame->getBody(); + writeBlock(frame.get()); + if(debug) std::cout << "SENT: " << *frame << std::endl; +} + +void Connector::writeBlock(AMQDataBlock* data){ + Mutex::ScopedLock l(writeLock); + data->encode(outbuf); + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); +} + +void Connector::writeToSocket(char* data, size_t available){ + size_t written = 0; + while(written < available && !closed){ + ssize_t sent = socket.send(data + written, available-written); + if(sent > 0) { + lastOut = now() * TIME_MSEC; + written += sent; + } + } +} + +void Connector::handleClosed(){ + closed = true; + socket.close(); + if(shutdownHandler) shutdownHandler->shutdown(); +} + +void Connector::checkIdle(ssize_t status){ + if(timeoutHandler){ + Time t = now() * TIME_MSEC; + if(status == Socket::SOCKET_TIMEOUT) { + if(idleIn && (t - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + } + else if(status == 0 || status == Socket::SOCKET_EOF) { + handleClosed(); + } + else { + lastIn = t; + } + if(idleOut && (t - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(uint16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(uint16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + socket.setTimeout(timeout*TIME_MSEC); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + ssize_t available = inbuf.available(); + if(available < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + ssize_t received = socket.recv(inbuf.start(), available); + checkIdle(received); + + if(!closed && received > 0){ + inbuf.move(received); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame(version); + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + } catch (const std::exception& e) { + std::cout << e.what() << std::endl; + handleClosed(); + } +} + +}} // namespace qpid::client diff --git a/cpp/src/client/Connector.h b/cpp/src/client/Connector.h new file mode 100644 index 0000000000..928bfa2d14 --- /dev/null +++ b/cpp/src/client/Connector.h @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + + +#include "../framing/InputHandler.h" +#include "../framing/OutputHandler.h" +#include "../framing/InitiationHandler.h" +#include "../framing/ProtocolInitiation.h" +#include "../framing/ProtocolVersion.h" +#include "../sys/ShutdownHandler.h" +#include "../sys/TimeoutHandler.h" +#include "../sys/Thread.h" +#include "../sys/Monitor.h" +#include "../sys/Socket.h" + +namespace qpid { + +namespace client { + +class Connector : public framing::OutputHandler, + private sys::Runnable +{ + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + framing::ProtocolVersion version; + + bool closed; + + int64_t lastIn; + int64_t lastOut; + int64_t timeout; + uint32_t idleIn; + uint32_t idleOut; + + sys::TimeoutHandler* timeoutHandler; + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; + + framing::Buffer inbuf; + framing::Buffer outbuf; + + sys::Mutex writeLock; + sys::Thread receiver; + + sys::Socket socket; + + void checkIdle(ssize_t status); + void writeBlock(framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + void handleClosed(); + + friend class Channel; + public: + Connector(framing::ProtocolVersion pVersion, + bool debug = false, uint32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(); + virtual void close(); + virtual void setInputHandler(framing::InputHandler* handler); + virtual void setTimeoutHandler(sys::TimeoutHandler* handler); + virtual void setShutdownHandler(sys::ShutdownHandler* handler); + virtual framing::OutputHandler* getOutputHandler(); + virtual void send(framing::AMQFrame* frame); + virtual void setReadTimeout(uint16_t timeout); + virtual void setWriteTimeout(uint16_t timeout); +}; + +}} + + +#endif diff --git a/cpp/src/client/IncomingMessage.cpp b/cpp/src/client/IncomingMessage.cpp new file mode 100644 index 0000000000..05c4bc2378 --- /dev/null +++ b/cpp/src/client/IncomingMessage.cpp @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "IncomingMessage.h" +#include "../Exception.h" +#include "ClientMessage.h" +#include <boost/format.hpp> + +namespace qpid { +namespace client { + +using boost::format; +using sys::Mutex; + +IncomingMessage::Destination::~Destination() {} + +void IncomingMessage::openReference(const std::string& name) { + Mutex::ScopedLock l(lock); + if (references.find(name) != references.end()) + throw ConnectionException( + 503, format("Attempt to open existing reference %s.") % name); + references[name]; + return; +} + +void IncomingMessage::appendReference( + const std::string& name, const std::string& data) +{ + Mutex::ScopedLock l(lock); + getRefUnlocked(name).data += data; +} + +Message& IncomingMessage::createMessage( + const std::string& destination, const std::string& reference) +{ + Mutex::ScopedLock l(lock); + getDestUnlocked(destination); // Verify destination. + Reference& ref = getRefUnlocked(reference); + ref.messages.resize(ref.messages.size() +1); + ref.messages.back().setDestination(destination); + return ref.messages.back(); +} + +void IncomingMessage::closeReference(const std::string& name) { + Reference refCopy; + { + Mutex::ScopedLock l(lock); + refCopy = getRefUnlocked(name); + references.erase(name); + } + for (std::vector<Message>::iterator i = refCopy.messages.begin(); + i != refCopy.messages.end(); + ++i) + { + i->setData(refCopy.data); + // TODO aconway 2007-03-23: Thread safety, + // can a destination be removed while we're doing this? + getDestination(i->getDestination()).message(*i); + } +} + + +void IncomingMessage::addDestination(std::string name, Destination& dest) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + destinations[name]=&dest; + else if (i->second != &dest) + throw ConnectionException( + 503, format("Destination already exists: %s.") % name); +} + +void IncomingMessage::removeDestination(std::string name) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + throw ConnectionException( + 503, format("No such destination: %s.") % name); + destinations.erase(i); +} + +IncomingMessage::Destination& IncomingMessage::getDestination( + const std::string& name) { + return getDestUnlocked(name); +} + +IncomingMessage::Reference& IncomingMessage::getReference( + const std::string& name) { + return getRefUnlocked(name); +} + +IncomingMessage::Reference& IncomingMessage::getRefUnlocked( + const std::string& name) { + Mutex::ScopedLock l(lock); + ReferenceMap::iterator i = references.find(name); + if (i == references.end()) + throw ConnectionException( + 503, format("No such reference: %s.") % name); + return i->second; +} + +IncomingMessage::Destination& IncomingMessage::getDestUnlocked( + const std::string& name) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + throw ConnectionException( + 503, format("No such destination: %s.") % name); + return *i->second; +} + +}} // namespace qpid::client diff --git a/cpp/src/client/IncomingMessage.h b/cpp/src/client/IncomingMessage.h new file mode 100644 index 0000000000..b01bd3eedc --- /dev/null +++ b/cpp/src/client/IncomingMessage.h @@ -0,0 +1,114 @@ +#ifndef _IncomingMessage_ +#define _IncomingMessage_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../sys/Mutex.h" +#include <map> +#include <vector> + + +namespace qpid { +namespace client { + +class Message; + +/** + * Manage incoming messages. + * + * Uses reference and destination concepts from 0-9 Messsage class. + * + * Basic messages use special destination and reference names to indicate + * get-ok, return etc. messages. + * + */ +class IncomingMessage { + public: + /** Accumulate data associated with a set of messages. */ + struct Reference { + std::string data; + std::vector<Message> messages; + }; + + /** Interface to a destination for messages. */ + class Destination { + public: + virtual ~Destination(); + + /** Pass a message to the destination */ + virtual void message(const Message&) = 0; + + /** Notify destination of queue-empty contition */ + virtual void empty() = 0; + }; + + + /** Add a reference. Throws if already open. */ + void openReference(const std::string& name); + + /** Get a reference. Throws if not already open. */ + void appendReference(const std::string& name, + const std::string& data); + + /** Create a message to destination associated with reference + *@exception if destination or reference non-existent. + */ + Message& createMessage(const std::string& destination, + const std::string& reference); + + /** Get a reference. + *@exception if non-existent. + */ + Reference& getReference(const std::string& name); + + /** Close a reference and deliver all its messages. + * Throws if not open or a message has an invalid destination. + */ + void closeReference(const std::string& name); + + /** Add a destination. + *@exception if a different Destination is already registered + * under name. + */ + void addDestination(std::string name, Destination&); + + /** Remove a destination. Throws if does not exist */ + void removeDestination(std::string name); + + /** Get a destination. Throws if does not exist */ + Destination& getDestination(const std::string& name); + private: + + typedef std::map<std::string, Reference> ReferenceMap; + typedef std::map<std::string, Destination*> DestinationMap; + + Reference& getRefUnlocked(const std::string& name); + Destination& getDestUnlocked(const std::string& name); + + mutable sys::Mutex lock; + ReferenceMap references; + DestinationMap destinations; +}; + +}} + + +#endif diff --git a/cpp/src/client/Makefile.am b/cpp/src/client/Makefile.am new file mode 100644 index 0000000000..4a98c3f539 --- /dev/null +++ b/cpp/src/client/Makefile.am @@ -0,0 +1,34 @@ +AM_CXXFLAGS = $(WARNING_CFLAGS) +INCLUDES = \ + -I$(srcdir)/../gen \ + $(APR_CXXFLAGS) + +lib_LTLIBRARIES = libqpidclient.la +libqpidclient_la_LIBADD = ../libqpidcommon.la +libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) +libqpidclient_la_SOURCES = \ + ClientChannel.cpp \ + ClientExchange.cpp \ + ClientQueue.cpp \ + BasicMessageChannel.cpp \ + Connection.cpp \ + Connector.cpp \ + IncomingMessage.cpp \ + MessageListener.cpp \ + ResponseHandler.cpp \ + ReturnedMessageHandler.cpp +pkginclude_HEADERS = \ + AckMode.h \ + ClientChannel.h \ + ClientExchange.h \ + ClientMessage.h \ + ClientQueue.h \ + Connection.h \ + Connector.h \ + IncomingMessage.h \ + MessageChannel.h \ + BasicMessageChannel.h \ + MessageListener.h \ + MethodBodyInstances.h \ + ResponseHandler.h \ + ReturnedMessageHandler.h diff --git a/cpp/src/client/MessageChannel.h b/cpp/src/client/MessageChannel.h new file mode 100644 index 0000000000..2fa387b7f7 --- /dev/null +++ b/cpp/src/client/MessageChannel.h @@ -0,0 +1,94 @@ +#ifndef _client_MessageChannel_h +#define _client_MessageChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../shared_ptr.h" +#include "../sys/Runnable.h" +#include "AckMode.h" + +namespace qpid { + +namespace framing { +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class FieldTable; +} + +namespace client { + +class Channel; +class Message; +class Queue; +class Exchange; +class MessageListener; +class ReturnedMessageHandler; + +/** + * Abstract interface for messaging implementation for a channel. + * + *@see Channel for documentation. + */ +class MessageChannel : public sys::Runnable +{ + public: + /**@see Channel::consume */ + virtual void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0) = 0; + + /**@see Channel::cancel */ + virtual void cancel(const std::string& tag, bool synch = true) = 0; + + /**@see Channel::get */ + virtual bool get( + Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0; + + /**@see Channel::get */ + virtual void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false) = 0; + + /**@see Channel::setReturnedMessageHandler */ + virtual void setReturnedMessageHandler( + ReturnedMessageHandler* handler) = 0; + + /** Handle an incoming method. */ + virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0; + + /** Handle an incoming header */ + virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0; + + /** Handle an incoming content */ + virtual void handle(shared_ptr<framing::AMQContentBody>) = 0; + + /** Send channel's QOS settings */ + virtual void setQos() = 0; + + /** Channel is closing */ + virtual void close() = 0; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_MessageChannel_h*/ diff --git a/cpp/src/client/MessageListener.cpp b/cpp/src/client/MessageListener.cpp new file mode 100644 index 0000000000..68ebedeb0d --- /dev/null +++ b/cpp/src/client/MessageListener.cpp @@ -0,0 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageListener.h" + +qpid::client::MessageListener::~MessageListener() {} diff --git a/cpp/src/client/MessageListener.h b/cpp/src/client/MessageListener.h new file mode 100644 index 0000000000..501862a3ef --- /dev/null +++ b/cpp/src/client/MessageListener.h @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +#ifndef _MessageListener_ +#define _MessageListener_ + +#include "ClientMessage.h" + +namespace qpid { +namespace client { + + /** + * An interface through which asynchronously delivered messages + * can be received by an application. + * + * @see Channel::consume() + * + * \ingroup clientapi + */ + class MessageListener{ + public: + virtual ~MessageListener(); + virtual void received(Message& msg) = 0; + }; + +} +} + + +#endif diff --git a/cpp/src/client/MethodBodyInstances.h b/cpp/src/client/MethodBodyInstances.h new file mode 100644 index 0000000000..57b9bf73ce --- /dev/null +++ b/cpp/src/client/MethodBodyInstances.h @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../framing/amqp_framing.h" + +#ifndef _MethodBodyInstances_h_ +#define _MethodBodyInstances_h_ + +namespace qpid { +namespace client { + +/** + * A list of method body instances that can be used to compare against + * incoming bodies. + */ +class MethodBodyInstances +{ +private: + qpid::framing::ProtocolVersion version; +public: + const qpid::framing::BasicCancelOkBody basic_cancel_ok; + const qpid::framing::BasicConsumeOkBody basic_consume_ok; + const qpid::framing::BasicDeliverBody basic_deliver; + const qpid::framing::BasicGetEmptyBody basic_get_empty; + const qpid::framing::BasicGetOkBody basic_get_ok; + const qpid::framing::BasicQosOkBody basic_qos_ok; + const qpid::framing::BasicReturnBody basic_return; + const qpid::framing::ChannelCloseBody channel_close; + const qpid::framing::ChannelCloseOkBody channel_close_ok; + const qpid::framing::ChannelFlowBody channel_flow; + const qpid::framing::ChannelOpenOkBody channel_open_ok; + const qpid::framing::ConnectionCloseBody connection_close; + const qpid::framing::ConnectionCloseOkBody connection_close_ok; + const qpid::framing::ConnectionOpenOkBody connection_open_ok; + const qpid::framing::ConnectionRedirectBody connection_redirect; + const qpid::framing::ConnectionStartBody connection_start; + const qpid::framing::ConnectionTuneBody connection_tune; + const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; + const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; + const qpid::framing::QueueDeclareOkBody queue_declare_ok; + const qpid::framing::QueueDeleteOkBody queue_delete_ok; + const qpid::framing::QueueBindOkBody queue_bind_ok; + const qpid::framing::TxCommitOkBody tx_commit_ok; + const qpid::framing::TxRollbackOkBody tx_rollback_ok; + const qpid::framing::TxSelectOkBody tx_select_ok; + + MethodBodyInstances(uint8_t major, uint8_t minor) : + version(major, minor), + basic_cancel_ok(version), + basic_consume_ok(version), + basic_deliver(version), + basic_get_empty(version), + basic_get_ok(version), + basic_qos_ok(version), + basic_return(version), + channel_close(version), + channel_close_ok(version), + channel_flow(version), + channel_open_ok(version), + connection_close(version), + connection_close_ok(version), + connection_open_ok(version), + connection_redirect(version), + connection_start(version), + connection_tune(version), + exchange_declare_ok(version), + exchange_delete_ok(version), + queue_declare_ok(version), + queue_delete_ok(version), + queue_bind_ok(version), + tx_commit_ok(version), + tx_rollback_ok(version), + tx_select_ok(version) + {} + +}; + +static MethodBodyInstances method_bodies(8, 0); + +} // namespace client +} // namespace qpid + +#endif diff --git a/cpp/src/client/ResponseHandler.cpp b/cpp/src/client/ResponseHandler.cpp new file mode 100644 index 0000000000..ca0129d587 --- /dev/null +++ b/cpp/src/client/ResponseHandler.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../QpidError.h" +#include <boost/format.hpp> +#include "ResponseHandler.h" +#include "../framing/AMQMethodBody.h" + +using namespace qpid::sys; +using namespace qpid::framing; + +namespace qpid { +namespace client { + +ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {} + +ResponseHandler::~ResponseHandler(){} + +bool ResponseHandler::isWaiting() { + Monitor::ScopedLock l(monitor); + return waiting; +} + +void ResponseHandler::expect(){ + Monitor::ScopedLock l(monitor); + waiting = true; +} + +void ResponseHandler::signalResponse(MethodPtr _response) +{ + Monitor::ScopedLock l(monitor); + response = _response; + if (!response) + shutdownFlag=true; + waiting = false; + monitor.notify(); +} + +ResponseHandler::MethodPtr ResponseHandler::receive() { + Monitor::ScopedLock l(monitor); + while (!response && !shutdownFlag) + monitor.wait(); + if (shutdownFlag) + THROW_QPID_ERROR( + PROTOCOL_ERROR, "Channel closed unexpectedly."); + MethodPtr result = response; + response.reset(); + return result; +} + +ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) { + MethodPtr response = receive(); + if(c != response->amqpClassId() || m != response->amqpMethodId()) { + THROW_QPID_ERROR( + PROTOCOL_ERROR, + boost::format("Expected class:method %d:%d, got %d:%d") + % c % m % response->amqpClassId() % response->amqpMethodId()); + } + return response; +} + +}} // namespace qpid::client diff --git a/cpp/src/client/ResponseHandler.h b/cpp/src/client/ResponseHandler.h new file mode 100644 index 0000000000..289a5dd994 --- /dev/null +++ b/cpp/src/client/ResponseHandler.h @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../shared_ptr.h" +#include "../sys/Monitor.h" + +#ifndef _ResponseHandler_ +#define _ResponseHandler_ + +namespace qpid { + +namespace framing { +class AMQMethodBody; +} + +namespace client { + +/** + * Holds a response from the broker peer for the client. + */ +class ResponseHandler{ + typedef shared_ptr<framing::AMQMethodBody> MethodPtr; + bool waiting; + bool shutdownFlag; + MethodPtr response; + sys::Monitor monitor; + + public: + ResponseHandler(); + ~ResponseHandler(); + + /** Is a response expected? */ + bool isWaiting(); + + /** Provide a response to the waiting thread */ + void signalResponse(MethodPtr response); + + /** Indicate a message is expected. */ + void expect(); + + /** Wait for a response. */ + MethodPtr receive(); + + /** Wait for a specific response. */ + MethodPtr receive(framing::ClassId, framing::MethodId); + + /** Template version of receive returns typed pointer. */ + template <class BodyType> + shared_ptr<BodyType> receive() { + return shared_polymorphic_downcast<BodyType>( + receive(BodyType::CLASS_ID, BodyType::METHOD_ID)); + } +}; + +}} + + +#endif diff --git a/cpp/src/client/ReturnedMessageHandler.cpp b/cpp/src/client/ReturnedMessageHandler.cpp new file mode 100644 index 0000000000..35d0b5c0a8 --- /dev/null +++ b/cpp/src/client/ReturnedMessageHandler.cpp @@ -0,0 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReturnedMessageHandler.h" + +qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} diff --git a/cpp/src/client/ReturnedMessageHandler.h b/cpp/src/client/ReturnedMessageHandler.h new file mode 100644 index 0000000000..8b42fc0764 --- /dev/null +++ b/cpp/src/client/ReturnedMessageHandler.h @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +#ifndef _ReturnedMessageHandler_ +#define _ReturnedMessageHandler_ + +#include "ClientMessage.h" + +namespace qpid { +namespace client { + + /** + * An interface through which returned messages can be received by + * an application. + * + * @see Channel::setReturnedMessageHandler() + * + * \ingroup clientapi + */ + class ReturnedMessageHandler{ + public: + virtual ~ReturnedMessageHandler(); + virtual void returned(Message& msg) = 0; + }; + +} +} + + +#endif |