diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
57 files changed, 4841 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 4a6590fb5f..093e9cea32 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -435,8 +435,8 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc } else { if ((iter->second->getPackageName() != packageName) || (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); } else try { diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 8b6cb5e049..13cf88fb11 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -386,7 +386,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) status = Manageable::STATUS_OK; else - return Manageable::STATUS_INVALID_PARAMETER; + return Manageable::STATUS_PARAMETER_INVALID; break; } default: diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index a1a3c6ada7..3b8a6c71d4 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -269,11 +269,13 @@ bool Connection::doOutput() { cb(); // Lend the IO thread for management processing } } - if (mgmtClosing) + if (mgmtClosing) { + closed(); close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); - else + } else { //then do other output as needed: return outputTasks.doOutput(); + } }catch(ConnectionException& e){ close(e.code, e.getMessage()); }catch(std::exception& e){ diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp index aa8dd8855b..21848e23de 100644 --- a/qpid/cpp/src/qpid/broker/Daemon.cpp +++ b/qpid/cpp/src/qpid/broker/Daemon.cpp @@ -85,7 +85,7 @@ void Daemon::fork() child(); } catch (const exception& e) { - QPID_LOG(critical, "Daemon startup failed: " << e.what()); + QPID_LOG(critical, "Unexpected error: " << e.what()); uint16_t port = 0; int unused_ret; //Supress warning about ignoring return value. unused_ret = write(pipeFds[1], &port, sizeof(uint16_t)); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 17fc0e23ca..b1fc1295f3 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -91,7 +91,9 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); + mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange->set_durable(durable); + mgmtExchange->set_autoDelete(false); agent->addObject (mgmtExchange); } } @@ -109,7 +111,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); + mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange->set_durable(durable); + mgmtExchange->set_autoDelete(false); mgmtExchange->set_arguments(args); if (!durable) { if (name.empty()) { @@ -139,6 +143,17 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } +void Exchange::setAlternate(Exchange::shared_ptr _alternate) +{ + alternate = _alternate; + if (mgmtExchange != 0) { + if (alternate.get() != 0) + mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); + else + mgmtExchange->clr_altExchange(); + } +} + void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index ca8525d55e..c1e878200f 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -132,7 +132,7 @@ public: qpid::framing::FieldTable& getArgs() { return args; } Exchange::shared_ptr getAlternate() { return alternate; } - void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; } + void setAlternate(Exchange::shared_ptr _alternate); void incAlternateUsers() { alternateUsers++; } void decAlternateUsers() { alternateUsers--; } bool inUseAsAlternate() { return alternateUsers > 0; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9ce0c710bd..cdba18ccf9 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -200,8 +200,10 @@ void Link::destroy () // Move the bridges to be deleted into a local vector so there is no // corruption of the iterator caused by bridge deletion. - for (Bridges::iterator i = active.begin(); i != active.end(); i++) + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); toDelete.push_back(*i); + } active.clear(); for (Bridges::iterator i = created.begin(); i != created.end(); i++) @@ -264,7 +266,6 @@ void Link::ioThreadProcessing() } if (!cancellations.empty()) { for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) { - active.push_back(*i); (*i)->cancel(*connection); } cancellations.clear(); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index b0c5e9ea00..af07605552 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -121,9 +121,11 @@ void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchang void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) { - if (alternate && alternate != exchange->getAlternate()) + if (alternate && ((exchange->getAlternate() && alternate != exchange->getAlternate()) + || !exchange->getAlternate())) throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange " - << exchange->getAlternate()->getName() << ", requested " + << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>") + << ", requested " << alternate->getName())); } diff --git a/qpid/cpp/src/qpid/client/Demux.h b/qpid/cpp/src/qpid/client/Demux.h index 7304e799bb..31dc3f9c06 100644 --- a/qpid/cpp/src/qpid/client/Demux.h +++ b/qpid/cpp/src/qpid/client/Demux.h @@ -25,6 +25,7 @@ #include "qpid/framing/FrameSet.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/BlockingQueue.h" +#include "qpid/client/ClientImportExport.h" #ifndef _Demux_ #define _Demux_ @@ -49,17 +50,17 @@ public: typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue; typedef boost::shared_ptr<Queue> QueuePtr; - Demux(); - ~Demux(); + QPID_CLIENT_EXTERN Demux(); + QPID_CLIENT_EXTERN ~Demux(); - void handle(framing::FrameSet::shared_ptr); - void close(const sys::ExceptionHolder& ex); - void open(); - - QueuePtr add(const std::string& name, Condition); - void remove(const std::string& name); - QueuePtr get(const std::string& name); - QueuePtr getDefault(); + QPID_CLIENT_EXTERN void handle(framing::FrameSet::shared_ptr); + QPID_CLIENT_EXTERN void close(const sys::ExceptionHolder& ex); + QPID_CLIENT_EXTERN void open(); + + QPID_CLIENT_EXTERN QueuePtr add(const std::string& name, Condition); + QPID_CLIENT_EXTERN void remove(const std::string& name); + QPID_CLIENT_EXTERN QueuePtr get(const std::string& name); + QPID_CLIENT_EXTERN QueuePtr getDefault(); private: struct Record diff --git a/qpid/cpp/src/qpid/client/Results.cpp b/qpid/cpp/src/qpid/client/Results.cpp index acb2684727..0de3e8bd04 100644 --- a/qpid/cpp/src/qpid/client/Results.cpp +++ b/qpid/cpp/src/qpid/client/Results.cpp @@ -24,7 +24,6 @@ #include "qpid/framing/SequenceSet.h" using namespace qpid::framing; -using namespace boost; namespace qpid { namespace client { diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 43b62ec6dc..8ead44a172 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -202,6 +202,16 @@ bool SessionImpl::isCompleteUpTo(const SequenceNumber& id) return f.result; } +framing::SequenceNumber SessionImpl::getCompleteUpTo() +{ + SequenceNumber firstIncomplete; + { + Lock l(state); + firstIncomplete = incompleteIn.front(); + } + return --firstIncomplete; +} + struct MarkCompleted { const SequenceNumber& id; @@ -319,7 +329,7 @@ struct MethodContentAdaptor : MethodContent } -Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { +Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool reframe) { Acquire a(sendLock); SequenceNumber id = nextOut++; { @@ -337,7 +347,7 @@ Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { frame.setEof(false); handleOut(frame); - if (content.isComplete()) { + if (reframe) { MethodContentAdaptor c(content); sendContent(c); } else { diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index cae1148e9f..49d268c44d 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -25,6 +25,7 @@ #include "qpid/client/Demux.h" #include "qpid/client/Execution.h" #include "qpid/client/Results.h" +#include "qpid/client/ClientImportExport.h" #include "qpid/SessionId.h" #include "qpid/SessionState.h" @@ -86,7 +87,15 @@ public: Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); - Future send(const framing::AMQBody& command, const framing::FrameSet& content); + /** + * This method takes the content as a FrameSet; if reframe=false, + * the caller is resposnible for ensuring that the header and + * content frames in that set are correct for this connection + * (right flags, right fragmentation etc). If reframe=true, then + * the header and content from the frameset will be copied and + * reframed correctly for the connection. + */ + QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false); void sendRawFrame(framing::AMQFrame& frame); Demux& getDemux(); @@ -94,6 +103,7 @@ public: void markCompleted(const framing::SequenceSet& ids, bool notifyPeer); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); + framing::SequenceNumber getCompleteUpTo(); void waitForCompletion(const framing::SequenceNumber& id); void sendCompletion(); void sendFlush(); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp new file mode 100644 index 0000000000..9b9f06ec57 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -0,0 +1,461 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/MessageSource.h" +#include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/amqp0_10/OutgoingMessage.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" +#include "qpid/messaging/Message.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/ReplyTo.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::Exception; +using qpid::messaging::Address; +using qpid::messaging::Filter; +using qpid::messaging::Variant; +using qpid::framing::FieldTable; +using qpid::framing::ReplyTo; +using namespace qpid::framing::message; + + +namespace{ +const Variant EMPTY_VARIANT; +const FieldTable EMPTY_FIELD_TABLE; +const std::string EMPTY_STRING; + +//option names +const std::string BROWSE("browse"); +const std::string EXCLUSIVE("exclusive"); +const std::string MODE("mode"); +const std::string NAME("name"); +const std::string UNACKNOWLEDGED("unacknowledged"); + +const std::string QUEUE_ADDRESS("queue"); +const std::string TOPIC_ADDRESS("topic"); +const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+"); +const std::string DIVIDER("/"); + +const std::string SIMPLE_SUBSCRIPTION("simple"); +const std::string RELIABLE_SUBSCRIPTION("reliable"); +const std::string DURABLE_SUBSCRIPTION("durable"); +} + +class QueueSource : public MessageSource +{ + public: + QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED, + bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE); + void subscribe(qpid::client::AsyncSession& session, const std::string& destination); + void cancel(qpid::client::AsyncSession& session, const std::string& destination); + private: + const std::string name; + const AcceptMode acceptMode; + const AcquireMode acquireMode; + const bool exclusive; + const FieldTable options; +}; + +class Subscription : public MessageSource +{ + public: + enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE}; + + Subscription(const std::string& name, SubscriptionMode mode = SIMPLE, + const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE); + void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + void subscribe(qpid::client::AsyncSession& session, const std::string& destination); + void cancel(qpid::client::AsyncSession& session, const std::string& destination); + + static SubscriptionMode getMode(const std::string& mode); + private: + struct Binding + { + Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + + std::string exchange; + std::string key; + FieldTable options; + }; + + typedef std::vector<Binding> Bindings; + + const std::string name; + const bool autoDelete; + const bool durable; + const FieldTable queueOptions; + const FieldTable subscriptionOptions; + Bindings bindings; + std::string queue; +}; + +class Exchange : public MessageSink +{ + public: + Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING, + bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, + const FieldTable& options = EMPTY_FIELD_TABLE); + void declare(qpid::client::AsyncSession& session, const std::string& name); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); + void cancel(qpid::client::AsyncSession& session, const std::string& name); + private: + const std::string name; + const std::string defaultSubject; + const bool passive; + const std::string type; + const bool durable; + const FieldTable options; +}; + +class QueueSink : public MessageSink +{ + public: + QueueSink(const std::string& name, bool passive=true, bool exclusive=false, + bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE); + void declare(qpid::client::AsyncSession& session, const std::string& name); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); + void cancel(qpid::client::AsyncSession& session, const std::string& name); + private: + const std::string name; + const bool passive; + const bool exclusive; + const bool autoDelete; + const bool durable; + const FieldTable options; +}; + + +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address); +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject); + +const Variant& getOption(const std::string& key, const Variant::Map& options) +{ + Variant::Map::const_iterator i = options.find(key); + if (i == options.end()) return EMPTY_VARIANT; + else return i->second; +} + +std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, + const Address& address, + const Filter* filter, + const Variant::Map& options) +{ + //TODO: handle case where there exists a queue and an exchange of + //the same name (hence an unqualified address is ambiguous) + + //TODO: make sure specified address type gives sane error message + //if it does npt match the configuration on server + + if (isQueue(session, address)) { + //TODO: support auto-created queue as source, if requested by specific option + + AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; + AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED; + bool exclusive = getOption(EXCLUSIVE, options).asBool(); + FieldTable arguments; + //TODO: extract subscribe arguments from options (e.g. either + //filter out already processed keys and send the rest, or have + //a nested map) + + std::auto_ptr<MessageSource> source = + std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments)); + return source; + } else { + //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important) + std::auto_ptr<Subscription> bindings = + std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(), + Subscription::getMode(getOption(MODE, options).asString()))); + + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value); + if (result.getNotFound()) { + throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); + } else if (result.getType() == "topic") { + if (filter) { + if (filter->type != Filter::WILDCARD) { + throw qpid::framing::NotImplementedException( + QPID_MSG("Filters of type " << filter->type << " not supported by address " << address)); + + } + for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) { + bindings->add(address.value, *i, qpid::framing::FieldTable()); + } + } else { + //default is to receive all messages + bindings->add(address.value, "*", qpid::framing::FieldTable()); + } + } else if (result.getType() == "fanout") { + if (filter) { + throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address)); + } + bindings->add(address.value, address.value, qpid::framing::FieldTable()); + } else if (result.getType() == "direct") { + //TODO: ???? + } else { + //TODO: xml and headers exchanges + throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address)); + } + std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release()); + return source; + } +} + + +std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, + const qpid::messaging::Address& address, + const qpid::messaging::Variant::Map& /*options*/) +{ + std::auto_ptr<MessageSink> sink; + if (isQueue(session, address)) { + //TODO: support for auto-created queues as sink + sink = std::auto_ptr<MessageSink>(new QueueSink(address.value)); + } else { + std::string subject; + if (isTopic(session, address, subject)) { + //TODO: support for auto-created exchanges as sink + sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject)); + } else { + if (address.type.empty()) { + throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); + } else { + throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type)); + } + } + } + return sink; +} + +QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) : + name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {} + +void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) +{ + session.messageSubscribe(arg::queue=name, + arg::destination=destination, + arg::acceptMode=acceptMode, + arg::acquireMode=acquireMode, + arg::exclusive=exclusive, + arg::arguments=options); +} + +void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination) +{ + session.messageCancel(destination); +} + +Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions) + : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE), + queueOptions(qOptions), subscriptionOptions(sOptions) {} + +void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options) +{ + bindings.push_back(Binding(exchange, key, options)); +} + +void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination) +{ + if (name.empty()) { + //TODO: use same scheme as JMS client for subscription queue name generation? + queue = session.getId().getName() + destination; + } else { + queue = name; + } + session.queueDeclare(arg::queue=queue, arg::exclusive=true, + arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions); + for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { + session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options); + } + AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); +} + +void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) +{ + session.messageCancel(destination); + session.queueDelete(arg::queue=queue); +} + +Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): + exchange(e), key(k), options(o) {} + +Subscription::SubscriptionMode Subscription::getMode(const std::string& s) +{ + if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE; + else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE; + else if (s == DURABLE_SUBSCRIPTION) return DURABLE; + else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s)); +} + +void convert(qpid::messaging::Message& from, qpid::client::Message& to); + +Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject, + bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) : + name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {} + +void Exchange::declare(qpid::client::AsyncSession& session, const std::string&) +{ + //TODO: should this really by synchronous? want to get error if not valid... + if (passive) { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } else { + sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options); + } +} + +void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) +{ + if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { + m.message.getDeliveryProperties().setRoutingKey(defaultSubject); + } + m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); +} + +void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {} + +QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive, + bool _autoDelete, bool _durable, const FieldTable& _options) : + name(_name), passive(_passive), exclusive(_exclusive), + autoDelete(_autoDelete), durable(_durable), options(_options) {} + +void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) +{ + //TODO: should this really by synchronous? + if (passive) { + sync(session).queueDeclare(arg::queue=name, arg::passive=true); + } else { + sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable, + arg::autoDelete=autoDelete, arg::arguments=options); + } +} +void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) +{ + m.message.getDeliveryProperties().setRoutingKey(name); + m.status = session.messageTransfer(arg::content=m.message); +} + +void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} + +template <class T> void encode(qpid::messaging::Message& from) +{ + T codec; + from.encode(codec); + from.setContentType(T::contentType); +} + +void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp + +void convert(qpid::messaging::Message& from, qpid::client::Message& to) +{ + //TODO: need to avoid copying as much as possible + if (from.getContent().isList()) encode<ListCodec>(from); + if (from.getContent().isMap()) encode<MapCodec>(from); + to.setData(from.getBytes()); + to.getDeliveryProperties().setRoutingKey(from.getSubject()); + //TODO: set other delivery properties + to.getMessageProperties().setContentType(from.getContentType()); + const Address& address = from.getReplyTo(); + if (!address.value.empty()) { + to.getMessageProperties().setReplyTo(AddressResolution::convert(address)); + } + translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders()); + //TODO: set other message properties +} + +Address AddressResolution::convert(const qpid::framing::ReplyTo& rt) +{ + if (rt.getExchange().empty()) { + if (rt.getRoutingKey().empty()) { + return Address();//empty address + } else { + return Address(rt.getRoutingKey(), QUEUE_ADDRESS); + } + } else { + if (rt.getRoutingKey().empty()) { + return Address(rt.getExchange(), TOPIC_ADDRESS); + } else { + return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT); + } + } +} + +qpid::framing::ReplyTo AddressResolution::convert(const Address& address) +{ + if (address.type == QUEUE_ADDRESS || address.type.empty()) { + return ReplyTo(EMPTY_STRING, address.value); + } else if (address.type == TOPIC_ADDRESS) { + return ReplyTo(address.value, EMPTY_STRING); + } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { + //need to split the value + string::size_type i = address.value.find(DIVIDER); + if (i != string::npos) { + std::string exchange = address.value.substr(0, i); + std::string routingKey; + if (i+1 < address.value.size()) { + routingKey = address.value.substr(i+1); + } + return ReplyTo(exchange, routingKey); + } else { + return ReplyTo(address.value, EMPTY_STRING); + } + } else { + QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type); + //treat as queue + return ReplyTo(EMPTY_STRING, address.value); + } +} + +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) +{ + return address.type == QUEUE_ADDRESS || + (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value); +} + +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject) +{ + if (address.type.empty()) { + return !session.exchangeQuery(address.value).getNotFound(); + } else if (address.type == TOPIC_ADDRESS) { + return true; + } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { + string::size_type i = address.value.find(DIVIDER); + if (i != string::npos) { + std::string exchange = address.value.substr(0, i); + if (i+1 < address.value.size()) { + subject = address.value.substr(i+1); + } + } + return true; + } else { + return false; + } +} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h new file mode 100644 index 0000000000..9d5657450d --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H +#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Variant.h" +#include "qpid/client/Session.h" + +namespace qpid { + +namespace framing{ +class ReplyTo; +} + +namespace messaging { +struct Address; +struct Filter; +} + +namespace client { +namespace amqp0_10 { + +class MessageSource; +class MessageSink; + +/** + * Maps from a generic Address and optional Filter to an AMQP 0-10 + * MessageSource which will then be used by a ReceiverImpl instance + * created for the address. + */ +class AddressResolution +{ + public: + std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session, + const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::Variant::Map& options); + + std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session, + const qpid::messaging::Address& address, + const qpid::messaging::Variant::Map& options); + + static qpid::messaging::Address convert(const qpid::framing::ReplyTo&); + static qpid::framing::ReplyTo convert(const qpid::messaging::Address&); + + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp b/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp new file mode 100644 index 0000000000..57184e3937 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp @@ -0,0 +1,299 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/messaging/Variant.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/List.h" +#include <algorithm> +#include <functional> + +using namespace qpid::framing; +using namespace qpid::messaging; + +namespace qpid { +namespace client { +namespace amqp0_10 { + +namespace { +const std::string iso885915("iso-8859-15"); +const std::string utf8("utf8"); +const std::string utf16("utf16"); +const std::string amqp0_10_binary("amqp0-10:binary"); +const std::string amqp0_10_bit("amqp0-10:bit"); +const std::string amqp0_10_datetime("amqp0-10:datetime"); +const std::string amqp0_10_struct("amqp0-10:struct"); +} + +template <class T, class U, class F> void convert(const T& from, U& to, F f) +{ + std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f); +} + +Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in); +FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in); +Variant toVariant(boost::shared_ptr<FieldValue> in); +boost::shared_ptr<FieldValue> toFieldValue(const Variant& in); + +template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f) +{ + T t; + getEncodedValue<T>(in, t); + convert(t, u, f); +} + +template <class T, class U, class F> T* toFieldValueCollection(const U& u, F f) +{ + typename T::ValueType t; + convert(u, t, f); + return new T(t); +} + +FieldTableValue* toFieldTableValue(const Variant::Map& map) +{ + FieldTable ft; + convert(map, ft, &toFieldTableEntry); + return new FieldTableValue(ft); +} + +ListValue* toListValue(const Variant::List& list) +{ + List l; + convert(list, l, &toFieldValue); + return new ListValue(l); +} + +void setEncodingFor(Variant& out, uint8_t code) +{ + switch(code){ + case 0x80: + case 0x90: + case 0xa0: + out.setEncoding(amqp0_10_binary); + break; + case 0x84: + case 0x94: + out.setEncoding(iso885915); + break; + case 0x85: + case 0x95: + out.setEncoding(utf8); + break; + case 0x86: + case 0x96: + out.setEncoding(utf16); + break; + case 0xab: + out.setEncoding(amqp0_10_struct); + break; + default: + //do nothing + break; + } +} + +Variant toVariant(boost::shared_ptr<FieldValue> in) +{ + Variant out; + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x01: out.setEncoding(amqp0_10_binary); + case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; + case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; + case 0x04: break; //TODO: iso-8859-15 char + case 0x08: out = in->getIntegerValue<bool, 1>(); break; + case 0x010: out.setEncoding(amqp0_10_binary); + case 0x011: out = in->getIntegerValue<int16_t, 2>(); break; + case 0x012: out = in->getIntegerValue<uint16_t, 2>(); break; + case 0x020: out.setEncoding(amqp0_10_binary); + case 0x021: out = in->getIntegerValue<int32_t, 4>(); break; + case 0x022: out = in->getIntegerValue<uint32_t, 4>(); break; + case 0x023: out = in->get<float>(); break; + case 0x027: break; //TODO: utf-32 char + case 0x030: out.setEncoding(amqp0_10_binary); + case 0x031: out = in->getIntegerValue<int64_t, 8>(); break; + case 0x038: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x032: out = in->getIntegerValue<uint64_t, 8>(); break; + case 0x033:out = in->get<double>(); break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: + case 0x84: + case 0x85: + case 0x86: + case 0x90: + case 0x94: + case 0x95: + case 0x96: + case 0xa0: + case 0xab: + setEncodingFor(out, in->getType()); + out = in->get<std::string>(); + break; + + case 0xa8: + out = Variant::Map(); + translate<FieldTable>(in, out.asMap(), &toVariantMapEntry); + break; + + case 0xa9: + out = Variant::List(); + translate<List>(in, out.asList(), &toVariant); + break; + case 0xaa: //convert amqp0-10 array into variant list + out = Variant::List(); + translate<Array>(in, out.asList(), &toVariant); + break; + + default: + //error? + break; + } + return out; +} + +boost::shared_ptr<FieldValue> toFieldValue(const Variant& in) +{ + boost::shared_ptr<FieldValue> out; + switch (in.getType()) { + case VAR_VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break; + case VAR_BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break; + case VAR_UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break; + case VAR_UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break; + case VAR_UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break; + case VAR_UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break; + case VAR_INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break; + case VAR_INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break; + case VAR_INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break; + case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break; + case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break; + case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break; + //TODO: check encoding (and length?) when deciding what AMQP type to treat string as + case VAR_STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break; + case VAR_MAP: + //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry)); + out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap())); + break; + case VAR_LIST: + //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue)); + out = boost::shared_ptr<FieldValue>(toListValue(in.asList())); + break; + } + return out; +} + +Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in) +{ + return Variant::Map::value_type(in.first, toVariant(in.second)); +} + +FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in) +{ + return FieldTable::value_type(in.first, toFieldValue(in.second)); +} + +struct EncodeBuffer +{ + char* data; + Buffer buffer; + + EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {} + ~EncodeBuffer() { delete[] data; } + + template <class T> void encode(T& t) { t.encode(buffer); } + + void getData(std::string& s) { + s.assign(data, buffer.getSize()); + } +}; + +struct DecodeBuffer +{ + Buffer buffer; + + DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {} + + template <class T> void decode(T& t) { t.decode(buffer); } + +}; + +template <class T, class U, class F> void _encode(const U& value, std::string& data, F f) +{ + T t; + convert(value, t, f); + EncodeBuffer buffer(t.encodedSize()); + buffer.encode(t); + buffer.getData(data); +} + +template <class T, class U, class F> void _decode(const std::string& data, U& value, F f) +{ + T t; + DecodeBuffer buffer(data); + buffer.decode(t); + convert(t, value, f); +} + +void MapCodec::encode(const Variant& value, std::string& data) +{ + _encode<FieldTable>(value.asMap(), data, &toFieldTableEntry); +} + +void MapCodec::decode(const std::string& data, Variant& value) +{ + value = Variant::Map(); + _decode<FieldTable>(data, value.asMap(), &toVariantMapEntry); +} + +void ListCodec::encode(const Variant& value, std::string& data) +{ + _encode<List>(value.asList(), data, &toFieldValue); +} + +void ListCodec::decode(const std::string& data, Variant& value) +{ + value = Variant::List(); + _decode<List>(data, value.asList(), &toVariant); +} + +void translate(const Variant::Map& from, FieldTable& to) +{ + convert(from, to, &toFieldTableEntry); +} + +void translate(const FieldTable& from, Variant::Map& to) +{ + convert(from, to, &toVariantMapEntry); +} + +const std::string ListCodec::contentType("amqp0_10/list"); +const std::string MapCodec::contentType("amqp0_10/map"); + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h b/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h new file mode 100644 index 0000000000..b5a561a9c3 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h @@ -0,0 +1,41 @@ +#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H +#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Variant.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Declarations of a couple of conversion functions implemented in + * Codecs.cpp but not exposed through API + */ + +void translate(const qpid::messaging::Variant::Map& from, qpid::framing::FieldTable& to); +void translate(const qpid::framing::FieldTable& from, qpid::messaging::Variant::Map& to); + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp new file mode 100644 index 0000000000..52b623b65c --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "CompletionTracker.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::framing::SequenceNumber; + +void CompletionTracker::track(SequenceNumber command, void* token) +{ + tokens[command] = token; +} + +void CompletionTracker::completedTo(SequenceNumber command) +{ + Tokens::iterator i = tokens.lower_bound(command); + if (i != tokens.end()) { + lastCompleted = i->second; + tokens.erase(tokens.begin(), ++i); + } +} + +void* CompletionTracker::getLastCompletedToken() +{ + return lastCompleted; +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h new file mode 100644 index 0000000000..6147c5682e --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h @@ -0,0 +1,50 @@ +#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H +#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include <map> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Provides a mapping from command ids to application supplied + * 'tokens', and is used to determine when the sending or + * acknowledging of a specific message is complete. + */ +class CompletionTracker +{ + public: + void track(qpid::framing::SequenceNumber command, void* token); + void completedTo(qpid::framing::SequenceNumber command); + void* getLastCompletedToken(); + private: + typedef std::map<qpid::framing::SequenceNumber, void*> Tokens; + Tokens tokens; + void* lastCompleted; +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp new file mode 100644 index 0000000000..3a735b5698 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionImpl.h" +#include "SessionImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/client/PrivateImplRef.h" +#include "qpid/log/Statement.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Variant; +using namespace qpid::sys; + +template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + value = (T) i->second; + } +} + +void convert(const Variant::Map& from, ConnectionSettings& to) +{ + setIfFound(from, "username", to.username); + setIfFound(from, "password", to.password); + setIfFound(from, "sasl-mechanism", to.mechanism); + setIfFound(from, "sasl-service", to.service); + setIfFound(from, "sasl-min-ssf", to.minSsf); + setIfFound(from, "sasl-max-ssf", to.maxSsf); + + setIfFound(from, "heartbeat", to.heartbeat); + setIfFound(from, "tcp-nodelay", to.tcpNoDelay); + + setIfFound(from, "locale", to.locale); + setIfFound(from, "max-channels", to.maxChannels); + setIfFound(from, "max-frame-size", to.maxFrameSize); + setIfFound(from, "bounds", to.bounds); +} + +ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) : + url(u), reconnectionEnabled(true), timeout(-1), + minRetryInterval(1), maxRetryInterval(30) +{ + QPID_LOG(debug, "Opening connection to " << url << " with " << options); + convert(options, settings); + setIfFound(options, "reconnection-enabled", reconnectionEnabled); + setIfFound(options, "reconnection-timeout", timeout); + setIfFound(options, "min-retry-interval", minRetryInterval); + setIfFound(options, "max-retry-interval", maxRetryInterval); + connection.open(url, settings); +} + +void ConnectionImpl::close() +{ + qpid::sys::Mutex::ScopedLock l(lock); + connection.close(); +} + +boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session) +{ + return boost::dynamic_pointer_cast<SessionImpl>( + qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session) + ); +} + +void ConnectionImpl::closed(SessionImpl& s) +{ + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + if (getImplPtr(*i).get() == &s) { + sessions.erase(i); + break; + } + } +} + +qpid::messaging::Session ConnectionImpl::newSession() +{ + qpid::messaging::Session impl(new SessionImpl(*this)); + { + qpid::sys::Mutex::ScopedLock l(lock); + sessions.push_back(impl); + } + try { + getImplPtr(impl)->setSession(connection.newSession()); + } catch (const TransportFailure&) { + reconnect(); + } + return impl; +} + +void ConnectionImpl::reconnect() +{ + AbsTime start = now(); + ScopedLock<Semaphore> l(semaphore); + if (!connection.isOpen()) connect(start); +} + +bool expired(const AbsTime& start, int timeout) +{ + if (timeout == 0) return true; + if (timeout < 0) return false; + Duration used(start, now()); + Duration allowed = timeout * TIME_SEC; + return allowed > used; +} + +void ConnectionImpl::connect(const AbsTime& started) +{ + for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) { + if (expired(started, timeout)) throw TransportFailure(); + else qpid::sys::sleep(i); + } +} + +bool ConnectionImpl::tryConnect() +{ + if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) { + return resetSessions(); + } else { + return false; + } +} + +bool ConnectionImpl::tryConnect(const Url& u) +{ + try { + QPID_LOG(info, "Trying to connect to " << url << "..."); + connection.open(u, settings); + return true; + } catch (const Exception& e) { + //TODO: need to fix timeout on open so that it throws TransportFailure + QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); + } + return false; +} + +bool ConnectionImpl::tryConnect(const std::vector<Url>& urls) +{ + for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + if (tryConnect(*i)) return true; + } + return false; +} + +bool ConnectionImpl::resetSessions() +{ + try { + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + getImplPtr(*i)->setSession(connection.newSession()); + } + return true; + } catch (const TransportFailure&) { + QPID_LOG(debug, "Connection failed while re-inialising sessions"); + return false; + } +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h new file mode 100644 index 0000000000..565f2ec7ec --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -0,0 +1,69 @@ +#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H +#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/Url.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Semaphore.h" +#include <vector> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class SessionImpl; + +class ConnectionImpl : public qpid::messaging::ConnectionImpl +{ + public: + ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options); + void close(); + qpid::messaging::Session newSession(); + void closed(SessionImpl&); + void reconnect(); + private: + typedef std::vector<qpid::messaging::Session> Sessions; + + qpid::sys::Mutex lock;//used to protect data structures + qpid::sys::Semaphore semaphore;//used to coordinate reconnection + qpid::client::Connection connection; + qpid::Url url; + qpid::client::ConnectionSettings settings; + Sessions sessions; + bool reconnectionEnabled; + int timeout; + int minRetryInterval; + int maxRetryInterval; + + void connect(const qpid::sys::AbsTime& started); + bool tryConnect(); + bool tryConnect(const std::vector<Url>& urls); + bool tryConnect(const Url&); + bool resetSessions(); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp new file mode 100644 index 0000000000..b0a16674e1 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/IncomingMessages.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/enum.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using namespace qpid::framing; +using namespace qpid::framing::message; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::messaging::MessageImplAccess; +using qpid::messaging::Variant; + +namespace { +const std::string EMPTY_STRING; + + +struct GetNone : IncomingMessages::Handler +{ + bool accept(IncomingMessages::MessageTransfer&) { return false; } +}; + +struct GetAny : IncomingMessages::Handler +{ + bool accept(IncomingMessages::MessageTransfer& transfer) + { + transfer.retrieve(0); + return true; + } +}; + +struct MatchAndTrack +{ + const std::string destination; + SequenceSet ids; + + MatchAndTrack(const std::string& d) : destination(d) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ids.add(command->getId()); + return true; + } else { + return false; + } + } +}; +} + +void IncomingMessages::setSession(qpid::client::AsyncSession s) +{ + session = s; + incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); +} + +bool IncomingMessages::get(Handler& handler, Duration timeout) +{ + //search through received list for any transfer of interest: + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) + { + MessageTransfer transfer(*i, *this); + if (handler.accept(transfer)) { + received.erase(i); + return true; + } + } + //none found, check incoming: + return process(&handler, timeout); +} + +void IncomingMessages::accept() +{ + session.messageAccept(unaccepted); + unaccepted.clear(); +} + +void IncomingMessages::releaseAll() +{ + //first process any received messages... + while (!received.empty()) { + retrieve(received.front(), 0); + received.pop_front(); + } + //then pump out any available messages from incoming queue... + GetAny handler; + while (process(&handler, 0)) ; + //now release all messages + session.messageRelease(unaccepted); + unaccepted.clear(); +} + +void IncomingMessages::releasePending(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) ; + + //now remove all messages for this destination from received list, recording their ids... + MatchAndTrack match(destination); + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ; + //now release those messages + session.messageRelease(match.ids); +} + +/** + * Get a frameset from session queue, waiting for up to the specified + * duration and returning true if this could be achieved, false + * otherwise. If a destination is supplied, only return a message for + * that destination. In this case messages from other destinations + * will be held on a received queue. + */ +bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + MessageTransfer transfer(content, *this); + if (handler && handler->accept(transfer)) { + QPID_LOG(debug, "Delivered " << *content->getMethod()); + return true; + } else { + //received message for another destination, keep for later + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + received.push_back(content); + } + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + return false; +} + +void populate(qpid::messaging::Message& message, FrameSet& command); + +/** + * Called when message is retrieved; records retrieval for subsequent + * acceptance, marks the command as completed and converts command to + * message if message is required + */ +void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message) +{ + if (message) { + populate(*message, *command); + } + const MessageTransferBody* transfer = command->as<MessageTransferBody>(); + if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { + unaccepted.add(command->getId()); + } + session.markCompleted(command->getId(), false, false); +} + +IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {} + +const std::string& IncomingMessages::MessageTransfer::getDestination() +{ + return content->as<MessageTransferBody>()->getDestination(); +} +void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message) +{ + parent.retrieve(content, message); +} + +void populateHeaders(qpid::messaging::Message& message, + const DeliveryProperties* deliveryProperties, + const MessageProperties* messageProperties) +{ + if (deliveryProperties) { + message.setSubject(deliveryProperties->getRoutingKey()); + //TODO: convert other delivery properties + } + if (messageProperties) { + message.setContentType(messageProperties->getContentType()); + if (messageProperties->hasReplyTo()) { + message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo())); + } + message.getHeaders().clear(); + translate(messageProperties->getApplicationHeaders(), message.getHeaders()); + //TODO: convert other message properties + } +} + +void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers) +{ + populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>()); +} + +void populate(qpid::messaging::Message& message, FrameSet& command) +{ + //need to be able to link the message back to the transfer it was delivered by + //e.g. for rejecting. + MessageImplAccess::get(message).setInternalId(command.getId()); + + command.getContent(message.getBytes()); + + populateHeaders(message, command.getHeaders()); + + //decode content if necessary + if (message.getContentType() == ListCodec::contentType) { + ListCodec codec; + message.decode(codec); + } else if (message.getContentType() == MapCodec::contentType) { + MapCodec codec; + message.decode(codec); + } +} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h new file mode 100644 index 0000000000..5e28877305 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -0,0 +1,91 @@ +#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H +#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/sys/BlockingQueue.h" +#include "qpid/sys/Time.h" + +namespace qpid { + +namespace framing{ +class FrameSet; +} + +namespace messaging { +class Message; +} + +namespace client { +namespace amqp0_10 { + +/** + * + */ +class IncomingMessages +{ + public: + typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr; + class MessageTransfer + { + public: + const std::string& getDestination(); + void retrieve(qpid::messaging::Message* message); + private: + FrameSetPtr content; + IncomingMessages& parent; + + MessageTransfer(FrameSetPtr, IncomingMessages&); + friend class IncomingMessages; + }; + + struct Handler + { + virtual ~Handler() {} + virtual bool accept(MessageTransfer& transfer) = 0; + }; + + void setSession(qpid::client::AsyncSession session); + bool get(Handler& handler, qpid::sys::Duration timeout); + //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); + //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout); + void accept(); + void releaseAll(); + void releasePending(const std::string& destination); + private: + typedef std::deque<FrameSetPtr> FrameSetQueue; + + qpid::client::AsyncSession session; + qpid::framing::SequenceSet unaccepted; + boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; + FrameSetQueue received; + + bool process(Handler*, qpid::sys::Duration); + void retrieve(FrameSetPtr, qpid::messaging::Message*); + +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h new file mode 100644 index 0000000000..d66d2ecb3c --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H +#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/AsyncSession.h" + +namespace qpid { + +namespace messaging { +class Message; +} + +namespace client { +namespace amqp0_10 { + +class OutgoingMessage; + +/** + * + */ +class MessageSink +{ + public: + virtual ~MessageSink() {} + virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0; + virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0; + virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0; + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h new file mode 100644 index 0000000000..74f2732f59 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h @@ -0,0 +1,47 @@ +#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H +#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/AsyncSession.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Abstraction behind which the AMQP 0-10 commands required to + * establish (and tear down) an incoming stream of messages from a + * given address are hidden. + */ +class MessageSource +{ + public: + virtual ~MessageSource() {} + virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0; + virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0; + + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp new file mode 100644 index 0000000000..716f955f98 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/OutgoingMessage.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Address; +using qpid::messaging::MessageImplAccess; + +template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to) +{ + T codec; + MessageImplAccess::get(from).getEncodedContent(codec, to.getData()); + to.getMessageProperties().setContentType(T::contentType); +} + +void OutgoingMessage::convert(const qpid::messaging::Message& from) +{ + //TODO: need to avoid copying as much as possible + if (from.getContent().isList()) { + encode<ListCodec>(from, message); + } else if (from.getContent().isMap()) { + encode<MapCodec>(from, message); + } else { + message.setData(from.getBytes()); + message.getMessageProperties().setContentType(from.getContentType()); + } + const Address& address = from.getReplyTo(); + if (!address.value.empty()) { + message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); + } + translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders()); + //TODO: set other message properties + message.getDeliveryProperties().setRoutingKey(from.getSubject()); + //TODO: set other delivery properties +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h new file mode 100644 index 0000000000..8801e4e769 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -0,0 +1,46 @@ +#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H +#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/Completion.h" +#include "qpid/client/Message.h" + +namespace qpid { +namespace messaging { +class Message; +} +namespace client { +namespace amqp0_10 { + +struct OutgoingMessage +{ + qpid::client::Message message; + qpid::client::Completion status; + + void convert(const qpid::messaging::Message&); +}; + + + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp new file mode 100644 index 0000000000..31efff38a6 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverImpl.h" +#include "AddressResolution.h" +#include "MessageSource.h" +#include "SessionImpl.h" +#include "qpid/messaging/MessageListener.h" +#include "qpid/messaging/Receiver.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Receiver; + +void ReceiverImpl::received(qpid::messaging::Message&) +{ + //TODO: should this be configurable + if (capacity && --window <= capacity/2) { + session.sendCompletion(); + window = capacity; + } +} + +qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) +{ + qpid::messaging::Message result; + if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); + return result; +} + +qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); + return result; +} + +bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + Get f(*this, message, timeout); + while (!parent.execute(f)) {} + return f.result; +} + +bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + Fetch f(*this, message, timeout); + while (!parent.execute(f)) {} + return f.result; +} + +void ReceiverImpl::cancel() +{ + execute<Cancel>(); +} + +void ReceiverImpl::start() +{ + execute<Start>(); +} + +void ReceiverImpl::stop() +{ + execute<Stop>(); +} + +void ReceiverImpl::setCapacity(uint32_t c) +{ + execute1<SetCapacity>(c); +} + +void ReceiverImpl::startFlow() +{ + if (capacity > 0) { + session.messageSetFlowMode(destination, FLOW_MODE_WINDOW); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity); + session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); + window = capacity; + } +} + +void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) +{ + + session = s; + if (state == UNRESOLVED) { + source = resolver.resolveSource(session, address, filter, options); + state = STOPPED;//TODO: if session is started, go straight to started + } + if (state == CANCELLED) { + source->cancel(session, destination); + parent.receiverCancelled(destination); + } else { + source->subscribe(session, destination); + if (state == STARTED) start(); + } +} + +void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } +qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } + +const std::string& ReceiverImpl::getName() const { return destination; } + +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a, + const qpid::messaging::Filter* f, + const qpid::messaging::Variant::Map& o) : + + parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF), + state(UNRESOLVED), capacity(0), listener(0), window(0) {} + +bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + return parent.get(*this, message, timeout); +} + +bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + if (state == CANCELLED) return false;//TODO: or should this be an error? + + if (capacity == 0 || state != STARTED) { + session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + } + + if (getImpl(message, timeout)) { + return true; + } else { + sync(session).messageFlush(destination); + startFlow();//reallocate credit + return getImpl(message, 0); + } +} + +void ReceiverImpl::cancelImpl() +{ + if (state != CANCELLED) { + state = CANCELLED; + source->cancel(session, destination); + parent.receiverCancelled(destination); + } +} + +void ReceiverImpl::startImpl() +{ + if (state == STOPPED) { + state = STARTED; + startFlow(); + } +} + +void ReceiverImpl::stopImpl() +{ + state = STOPPED; + session.messageStop(destination); +} + +void ReceiverImpl::setCapacityImpl(uint32_t c) +{ + if (c != capacity) { + capacity = c; + if (state == STARTED) { + session.messageStop(destination); + startFlow(); + } + } +} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h new file mode 100644 index 0000000000..509c784513 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -0,0 +1,165 @@ +#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H +#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/ReceiverImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/amqp0_10/SessionImpl.h" +#include "qpid/sys/Time.h" +#include <memory> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class AddressResolution; +class MessageSource; + +/** + * A receiver implementation based on an AMQP 0-10 subscription. + */ +class ReceiverImpl : public qpid::messaging::ReceiverImpl +{ + public: + + enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED}; + + ReceiverImpl(SessionImpl& parent, const std::string& name, + const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::Variant::Map& options); + + void init(qpid::client::AsyncSession session, AddressResolution& resolver); + bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); + qpid::messaging::Message get(qpid::sys::Duration timeout); + bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); + qpid::messaging::Message fetch(qpid::sys::Duration timeout); + void cancel(); + void start(); + void stop(); + const std::string& getName() const; + void setCapacity(uint32_t); + void setListener(qpid::messaging::MessageListener* listener); + qpid::messaging::MessageListener* getListener(); + void received(qpid::messaging::Message& message); + private: + SessionImpl& parent; + const std::string destination; + const qpid::messaging::Address address; + const qpid::messaging::Filter* filter; + const qpid::messaging::Variant::Map options; + const uint32_t byteCredit; + State state; + + std::auto_ptr<MessageSource> source; + uint32_t capacity; + qpid::client::AsyncSession session; + qpid::messaging::MessageListener* listener; + uint32_t window; + + void startFlow(); + //implementation of public facing methods + bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout); + bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout); + void startImpl(); + void stopImpl(); + void cancelImpl(); + void setCapacityImpl(uint32_t); + + //functors for public facing methods (allows locking and retry + //logic to be centralised) + struct Command + { + ReceiverImpl& impl; + + Command(ReceiverImpl& i) : impl(i) {} + }; + + struct Get : Command + { + qpid::messaging::Message& message; + qpid::sys::Duration timeout; + bool result; + + Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : + Command(i), message(m), timeout(t), result(false) {} + void operator()() { result = impl.getImpl(message, timeout); } + }; + + struct Fetch : Command + { + qpid::messaging::Message& message; + qpid::sys::Duration timeout; + bool result; + + Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : + Command(i), message(m), timeout(t), result(false) {} + void operator()() { result = impl.fetchImpl(message, timeout); } + }; + + struct Stop : Command + { + Stop(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.stopImpl(); } + }; + + struct Start : Command + { + Start(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.startImpl(); } + }; + + struct Cancel : Command + { + Cancel(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.cancelImpl(); } + }; + + struct SetCapacity : Command + { + uint32_t capacity; + + SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {} + void operator()() { impl.setCapacityImpl(capacity); } + }; + + //helper templates for some common patterns + template <class F> void execute() + { + F f(*this); + parent.execute(f); + } + + template <class F, class P> void execute1(P p) + { + F f(*this, p); + parent.execute(f); + } +}; + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp new file mode 100644 index 0000000000..c619d1226a --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderImpl.h" +#include "MessageSink.h" +#include "SessionImpl.h" +#include "AddressResolution.h" +#include "OutgoingMessage.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, + const qpid::messaging::Address& _address, + const qpid::messaging::Variant::Map& _options) : + parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), + capacity(50), window(0) {} + +void SenderImpl::send(const qpid::messaging::Message& m) +{ + execute1<Send>(&m); +} + +void SenderImpl::cancel() +{ + execute<Cancel>(); +} + +void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) +{ + session = s; + if (state == UNRESOLVED) { + sink = resolver.resolveSink(session, address, options); + state = ACTIVE; + } + if (state == CANCELLED) { + sink->cancel(session, name); + parent.senderCancelled(name); + } else { + sink->declare(session, name); + replay(); + } +} + +void SenderImpl::sendImpl(const qpid::messaging::Message& m) +{ + //TODO: make recoding for replay optional + std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); + msg->convert(m); + outgoing.push_back(msg.release()); + sink->send(session, name, outgoing.back()); + if (++window > (capacity / 2)) {//TODO: make this configurable? + session.flush(); + checkPendingSends(); + window = 0; + } +} + +void SenderImpl::replay() +{ + for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + sink->send(session, name, *i); + } +} + +void SenderImpl::checkPendingSends() +{ + while (!outgoing.empty() && outgoing.front().status.isComplete()) { + outgoing.pop_front(); + } +} + +void SenderImpl::cancelImpl() +{ + state = CANCELLED; + sink->cancel(session, name); + parent.senderCancelled(name); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h new file mode 100644 index 0000000000..4ba793d71c --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -0,0 +1,118 @@ +#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H +#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/SenderImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/amqp0_10/SessionImpl.h" +#include <memory> +#include <boost/ptr_container/ptr_deque.hpp> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class AddressResolution; +class MessageSink; +class OutgoingMessage; + +/** + * + */ +class SenderImpl : public qpid::messaging::SenderImpl +{ + public: + enum State {UNRESOLVED, ACTIVE, CANCELLED}; + + SenderImpl(SessionImpl& parent, const std::string& name, + const qpid::messaging::Address& address, + const qpid::messaging::Variant::Map& options); + void send(const qpid::messaging::Message&); + void cancel(); + void init(qpid::client::AsyncSession, AddressResolution&); + + private: + SessionImpl& parent; + const std::string name; + const qpid::messaging::Address address; + const qpid::messaging::Variant::Map options; + State state; + std::auto_ptr<MessageSink> sink; + + qpid::client::AsyncSession session; + std::string destination; + std::string routingKey; + + typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages; + OutgoingMessages outgoing; + uint32_t capacity; + uint32_t window; + + void checkPendingSends(); + void replay(); + + //logic for application visible methods: + void sendImpl(const qpid::messaging::Message&); + void cancelImpl(); + + //functors for application visible methods (allowing locking and + //retry to be centralised): + struct Command + { + SenderImpl& impl; + + Command(SenderImpl& i) : impl(i) {} + }; + + struct Send : Command + { + const qpid::messaging::Message* message; + + Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} + void operator()() { impl.sendImpl(*message); } + }; + + struct Cancel : Command + { + Cancel(SenderImpl& i) : Command(i) {} + void operator()() { impl.cancelImpl(); } + }; + + //helper templates for some common patterns + template <class F> void execute() + { + F f(*this); + parent.execute(f); + } + + template <class F, class P> void execute1(P p) + { + F f(*this, p); + parent.execute(f); + } +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp new file mode 100644 index 0000000000..0e6c430d89 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -0,0 +1,375 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/SessionImpl.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/client/amqp0_10/ReceiverImpl.h" +#include "qpid/client/amqp0_10/SenderImpl.h" +#include "qpid/client/amqp0_10/MessageSource.h" +#include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/PrivateImplRef.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/messaging/MessageListener.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Session.h" +#include "qpid/framing/reply_exceptions.h" +#include <boost/format.hpp> +#include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> + +using qpid::messaging::Filter; +using qpid::messaging::MessageImplAccess; +using qpid::messaging::Sender; +using qpid::messaging::Receiver; +using qpid::messaging::VariantMap; + +namespace qpid { +namespace client { +namespace amqp0_10 { + +SessionImpl::SessionImpl(ConnectionImpl& c) : connection(c) {} + + +void SessionImpl::sync() +{ + retry<Sync>(); +} + +void SessionImpl::flush() +{ + retry<Flush>(); +} + +void SessionImpl::commit() +{ + if (!execute<Commit>()) { + throw Exception();//TODO: what type? + } +} + +void SessionImpl::rollback() +{ + //If the session fails during this operation, the transaction will + //be rolled back anyway. + execute<Rollback>(); +} + +void SessionImpl::acknowledge() +{ + //Should probably throw an exception on failure here, or indicate + //it through a return type at least. Failure means that the + //message may be redelivered; i.e. the application cannot delete + //any state necessary for preventing reprocessing of the message + execute<Acknowledge>(); +} + +void SessionImpl::reject(qpid::messaging::Message& m) +{ + //Possibly want to somehow indicate failure here as well. Less + //clear need as compared to acknowledge however. + execute1<Reject>(m); +} + +void SessionImpl::close() +{ + connection.closed(*this); + session.close(); +} + +template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) +{ + return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t)); +} + +template <class T> void getFreeKey(std::string& key, T& map) +{ + std::string name = key; + int count = 1; + for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) { + name = (boost::format("%1%_%2%") % key % ++count).str(); + } + key = name; +} + + +void SessionImpl::setSession(qpid::client::Session s) +{ + qpid::sys::Mutex::ScopedLock l(lock); + session = s; + incoming.setSession(session); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); + } +} + +struct SessionImpl::CreateReceiver : Command +{ + qpid::messaging::Receiver result; + const qpid::messaging::Address& address; + const Filter* filter; + const qpid::messaging::Variant::Map& options; + + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f, + const qpid::messaging::Variant::Map& o) : + Command(i), address(a), filter(f), options(o) {} + void operator()() { result = impl.createReceiverImpl(address, filter, options); } +}; + +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options) +{ + CreateReceiver f(*this, address, 0, options); + while (!execute(f)) {} + return f.result; +} + +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, + const Filter& filter, const VariantMap& options) +{ + CreateReceiver f(*this, address, &filter, options); + while (!execute(f)) {} + return f.result; +} + +Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address, + const Filter* filter, const VariantMap& options) +{ + std::string name = address; + getFreeKey(name, receivers); + Receiver receiver(new ReceiverImpl(*this, name, address, filter, options)); + getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver); + receivers[name] = receiver; + return receiver; +} + +struct SessionImpl::CreateSender : Command +{ + qpid::messaging::Sender result; + const qpid::messaging::Address& address; + const qpid::messaging::Variant::Map& options; + + CreateSender(SessionImpl& i, const qpid::messaging::Address& a, + const qpid::messaging::Variant::Map& o) : + Command(i), address(a), options(o) {} + void operator()() { result = impl.createSenderImpl(address, options); } +}; + +Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options) +{ + CreateSender f(*this, address, options); + while (!execute(f)) {} + return f.result; +} + +Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options) +{ + std::string name = address; + getFreeKey(name, senders); + Sender sender(new SenderImpl(*this, name, address, options)); + getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver); + senders[name] = sender; + return sender; +} + +qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName) +{ + std::string name = baseName + std::string("_") + session.getId().getName(); + session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); + return qpid::messaging::Address(name); +} + +SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) +{ + boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); + if (!impl) { + throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); + } + return *impl; +} + +namespace { + +struct IncomingMessageHandler : IncomingMessages::Handler +{ + typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback; + Callback callback; + + IncomingMessageHandler(Callback c) : callback(c) {} + + bool accept(IncomingMessages::MessageTransfer& transfer) + { + return callback(transfer); + } +}; + +} + +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, + bool isDispatch, + IncomingMessages::MessageTransfer& transfer) +{ + if (receiver->getName() == transfer.getDestination()) { + transfer.retrieve(message); + if (isDispatch) { + qpid::sys::Mutex::ScopedUnlock u(lock); + qpid::messaging::MessageListener* listener = receiver->getListener(); + if (listener) listener->received(*message); + } + receiver->received(*message); + return true; + } else { + return false; + } +} + +bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer) +{ + Receivers::iterator i = receivers.find(transfer.getDestination()); + if (i == receivers.end()) { + QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); + return false; + } else { + boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second); + return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer); + } +} + +bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout) +{ + return incoming.get(handler, timeout); +} + +bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1)); + return getIncoming(handler, timeout); +} + +bool SessionImpl::dispatch(qpid::sys::Duration timeout) +{ + qpid::sys::Mutex::ScopedLock l(lock); + while (true) { + try { + qpid::messaging::Message message; + IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1)); + return getIncoming(handler, timeout); + } catch (TransportFailure&) { + reconnect(); + } + } +} + +bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + qpid::sys::Mutex::ScopedLock l(lock); + while (true) { + try { + IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1)); + return getIncoming(handler, timeout); + } catch (TransportFailure&) { + reconnect(); + } + } +} + +void SessionImpl::syncImpl() +{ + session.sync(); +} + +void SessionImpl::flushImpl() +{ + session.flush(); +} + + +void SessionImpl::commitImpl() +{ + incoming.accept(); + session.txCommit(); +} + +void SessionImpl::rollbackImpl() +{ + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop(); + //ensure that stop has been processed and all previously sent + //messages are available for release: + session.sync(); + incoming.releaseAll(); + session.txRollback(); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start(); +} + +void SessionImpl::acknowledgeImpl() +{ + incoming.accept(); +} + +void SessionImpl::rejectImpl(qpid::messaging::Message& m) +{ + SequenceSet set; + set.add(MessageImplAccess::get(m).getInternalId()); + session.messageReject(set); +} + +qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); + return result; +} + +void SessionImpl::receiverCancelled(const std::string& name) +{ + receivers.erase(name); + session.sync(); + incoming.releasePending(name); +} + +void SessionImpl::senderCancelled(const std::string& name) +{ + senders.erase(name); +} + +void SessionImpl::reconnect() +{ + connection.reconnect(); +} + +void* SessionImpl::getLastConfirmedSent() +{ + return 0; +} + +void* SessionImpl::getLastConfirmedAcknowledged() +{ + return 0; +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h new file mode 100644 index 0000000000..1c7db17bbb --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -0,0 +1,202 @@ +#ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H +#define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/client/Session.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/IncomingMessages.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { + +namespace messaging { +struct Address; +struct Filter; +class Message; +class Receiver; +class Sender; +class Session; +} + +namespace client { +namespace amqp0_10 { + +class ConnectionImpl; +class ReceiverImpl; +class SenderImpl; + +/** + * Implementation of the protocol independent Session interface using + * AMQP 0-10. + */ +class SessionImpl : public qpid::messaging::SessionImpl +{ + public: + SessionImpl(ConnectionImpl&); + void commit(); + void rollback(); + void acknowledge(); + void reject(qpid::messaging::Message&); + void close(); + void sync(); + void flush(); + qpid::messaging::Address createTempQueue(const std::string& baseName); + qpid::messaging::Sender createSender(const qpid::messaging::Address& address, + const qpid::messaging::VariantMap& options); + qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, + const qpid::messaging::VariantMap& options); + qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, + const qpid::messaging::Filter& filter, + const qpid::messaging::VariantMap& options); + + void* getLastConfirmedSent(); + void* getLastConfirmedAcknowledged(); + + bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); + qpid::messaging::Message fetch(qpid::sys::Duration timeout); + bool dispatch(qpid::sys::Duration timeout); + + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); + + void receiverCancelled(const std::string& name); + void senderCancelled(const std::string& name); + + void setSession(qpid::client::Session); + + template <class T> bool execute(T& f) + { + try { + qpid::sys::Mutex::ScopedLock l(lock); + f(); + return true; + } catch (TransportFailure&) { + reconnect(); + return false; + } + } + + static SessionImpl& convert(qpid::messaging::Session&); + + private: + typedef std::map<std::string, qpid::messaging::Receiver> Receivers; + typedef std::map<std::string, qpid::messaging::Sender> Senders; + + qpid::sys::Mutex lock; + ConnectionImpl& connection; + qpid::client::Session session; + AddressResolution resolver; + IncomingMessages incoming; + Receivers receivers; + Senders senders; + + bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); + bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); + bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); + void reconnect(); + + void commitImpl(); + void rollbackImpl(); + void acknowledgeImpl(); + void rejectImpl(qpid::messaging::Message&); + void closeImpl(); + void syncImpl(); + void flushImpl(); + qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address, + const qpid::messaging::VariantMap& options); + qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::VariantMap& options); + + //functors for public facing methods (allows locking and retry + //logic to be centralised) + struct Command + { + SessionImpl& impl; + + Command(SessionImpl& i) : impl(i) {} + }; + + struct Commit : Command + { + Commit(SessionImpl& i) : Command(i) {} + void operator()() { impl.commitImpl(); } + }; + + struct Rollback : Command + { + Rollback(SessionImpl& i) : Command(i) {} + void operator()() { impl.rollbackImpl(); } + }; + + struct Acknowledge : Command + { + Acknowledge(SessionImpl& i) : Command(i) {} + void operator()() { impl.acknowledgeImpl(); } + }; + + struct Sync : Command + { + Sync(SessionImpl& i) : Command(i) {} + void operator()() { impl.syncImpl(); } + }; + + struct Flush : Command + { + Flush(SessionImpl& i) : Command(i) {} + void operator()() { impl.flushImpl(); } + }; + + struct Reject : Command + { + qpid::messaging::Message& message; + + Reject(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.rejectImpl(message); } + }; + + struct CreateSender; + struct CreateReceiver; + + //helper templates for some common patterns + template <class F> bool execute() + { + F f(*this); + return execute(f); + } + + template <class F> void retry() + { + while (!execute<F>()) {} + } + + template <class F, class P> bool execute1(P p) + { + F f(*this, p); + return execute(f); + } +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index ac418ffbb6..2e557f2ab6 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -213,7 +213,7 @@ class MessageUpdater { framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); - sb.get()->send(transfer, message.payload->getFrames()); + sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp index bc832e9db4..255a3b74a4 100644 --- a/qpid/cpp/src/qpid/framing/FieldTable.cpp +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -185,13 +185,8 @@ template <class T, int width, uint8_t typecode> bool getRawFixedWidthValue(FieldTable::ValuePtr vptr, T& value) { if (vptr && vptr->getType() == typecode) { - FixedWidthValue<width>* fwv = dynamic_cast< FixedWidthValue<width>* >(&vptr->getData()); - if (fwv) { - uint8_t* const octets = Endian::convertIfRequired(fwv->rawOctets(), width); - uint8_t* const target = reinterpret_cast<uint8_t*>(&value); - for (uint i = 0; i < width; ++i) target[i] = octets[i]; - return true; - } + value = vptr->get<T>(); + return true; } return false; } @@ -370,5 +365,16 @@ void FieldTable::erase(const std::string& name) values.erase(name); } +std::pair<FieldTable::ValueMap::iterator, bool> FieldTable::insert(const ValueMap::value_type& value) +{ + return values.insert(value); +} + +FieldTable::ValueMap::iterator FieldTable::insert(ValueMap::iterator position, const ValueMap::value_type& value) +{ + return values.insert(position, value); +} + + } } diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp index 5f7248a751..5bac931b83 100644 --- a/qpid/cpp/src/qpid/framing/FieldValue.cpp +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -22,6 +22,7 @@ #include "qpid/framing/Array.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/Endian.h" +#include "qpid/framing/List.h" #include "qpid/framing/reply_exceptions.h" namespace qpid { @@ -37,6 +38,8 @@ void FieldValue::setType(uint8_t type) typeOctet = type; if (typeOctet == 0xA8) { data.reset(new EncodedValue<FieldTable>()); + } else if (typeOctet == 0xA9) { + data.reset(new EncodedValue<List>()); } else if (typeOctet == 0xAA) { data.reset(new EncodedValue<Array>()); } else { @@ -164,10 +167,37 @@ FieldTableValue::FieldTableValue(const FieldTable& f) : FieldValue(0xa8, new Enc { } +ListValue::ListValue(const List& l) : FieldValue(0xa9, new EncodedValue<List>(l)) +{ +} + ArrayValue::ArrayValue(const Array& a) : FieldValue(0xaa, new EncodedValue<Array>(a)) { } +VoidValue::VoidValue() : FieldValue(0xf0, new FixedWidthValue<0>()) {} + +BoolValue::BoolValue(bool b) : + FieldValue(0x08, new FixedWidthValue<1>(b)) +{} + +Unsigned8Value::Unsigned8Value(uint8_t v) : + FieldValue(0x02, new FixedWidthValue<1>(v)) +{} +Unsigned16Value::Unsigned16Value(uint16_t v) : + FieldValue(0x12, new FixedWidthValue<2>(v)) +{} +Unsigned32Value::Unsigned32Value(uint32_t v) : + FieldValue(0x22, new FixedWidthValue<4>(v)) +{} + +Integer8Value::Integer8Value(int8_t v) : + FieldValue(0x01, new FixedWidthValue<1>(v)) +{} +Integer16Value::Integer16Value(int16_t v) : + FieldValue(0x11, new FixedWidthValue<2>(v)) +{} + void FieldValue::print(std::ostream& out) const { data->print(out); out << TypeCode(typeOctet) << '('; @@ -177,4 +207,9 @@ void FieldValue::print(std::ostream& out) const { out << ')'; } +uint8_t* FieldValue::convertIfRequired(uint8_t* const octets, int width) +{ + return Endian::convertIfRequired(octets, width); +} + }} diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp new file mode 100644 index 0000000000..bde7dabbac --- /dev/null +++ b/qpid/cpp/src/qpid/framing/List.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/List.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace framing { + +uint32_t List::encodedSize() const +{ + uint32_t len(4/*size*/ + 4/*count*/); + for(Values::const_iterator i = values.begin(); i != values.end(); ++i) { + len += (*i)->encodedSize(); + } + return len; +} + +void List::encode(Buffer& buffer) const +{ + buffer.putLong(encodedSize() - 4); + buffer.putLong(size()); + for (Values::const_iterator i = values.begin(); i!=values.end(); ++i) { + (*i)->encode(buffer); + } +} + +void List::decode(Buffer& buffer) +{ + values.clear(); + uint32_t size = buffer.getLong(); + uint32_t available = buffer.available(); + if (available < size) { + throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected " + << size << " bytes but only " << available << " available")); + } + if (size) { + uint32_t count = buffer.getLong(); + for (uint32_t i = 0; i < count; i++) { + ValuePtr value(new FieldValue); + value->decode(buffer); + values.push_back(value); + } + } +} + + +bool List::operator==(const List& other) const { + return values.size() == other.values.size() && + std::equal(values.begin(), values.end(), other.values.begin()); +} + +std::ostream& operator<<(std::ostream& out, const List& l) +{ + out << "{"; + for(List::Values::const_iterator i = l.values.begin(); i != l.values.end(); ++i) { + if (i != l.values.begin()) out << ", "; + (*i)->print(out); + } + return out << "}"; +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp index c0b41c6906..71fa6a7329 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.cpp +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -17,6 +17,8 @@ */ #include "qpid/framing/Uuid.h" + +#include "qpid/sys/uuid.h" #include "qpid/Exception.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" @@ -28,6 +30,35 @@ using namespace std; static const size_t UNPARSED_SIZE=36; +Uuid::Uuid(bool unique) { + if (unique) { + generate(); + } else { + clear(); + } +} + +Uuid::Uuid(const uint8_t* data) { + assign(data); +} + +void Uuid::assign(const uint8_t* data) { + uuid_copy(c_array(), data); +} + +void Uuid::generate() { + uuid_generate(c_array()); +} + +void Uuid::clear() { + uuid_clear(c_array()); +} + +// Force int 0/!0 to false/true; avoids compile warnings. +bool Uuid::isNull() { + return !!uuid_is_null(data()); +} + void Uuid::encode(Buffer& buf) const { buf.putRawData(data(), size()); } diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp index e487dfc455..a3593e73e3 100644 --- a/qpid/cpp/src/qpid/management/Manageable.cpp +++ b/qpid/cpp/src/qpid/management/Manageable.cpp @@ -33,7 +33,7 @@ string Manageable::StatusText (status_t status, string text) case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; - case STATUS_INVALID_PARAMETER : return "InvalidParameter"; + case STATUS_PARAMETER_INVALID : return "InvalidParameter"; case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; case STATUS_FORBIDDEN : return "Forbidden"; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 2df10b1e95..0e462342d4 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -535,8 +535,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey } else { if ((iter->second->getPackageName() != packageName) || (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); } else try { diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp new file mode 100644 index 0000000000..ed35054a00 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Address.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" + +namespace qpid { +namespace client { +} + +namespace messaging { + +Address::Address() {} +Address::Address(const std::string& address) : value(address) {} +Address::Address(const std::string& address, const std::string& t) : value(address), type(t) {} +Address::operator const std::string&() const { return value; } +const std::string& Address::toStr() const { return value; } +Address::operator bool() const { return !value.empty(); } +bool Address::operator !() const { return value.empty(); } + +const std::string TYPE_SEPARATOR(":"); + +std::ostream& operator<<(std::ostream& out, const Address& address) +{ + if (!address.type.empty()) { + out << address.type; + out << TYPE_SEPARATOR; + } + out << address.value; + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp new file mode 100644 index 0000000000..feb6566008 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/SessionImpl.h" +#include "qpid/client/PrivateImplRef.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace client { + +typedef PrivateImplRef<qpid::messaging::Connection> PI; + +} + +namespace messaging { + +using qpid::client::PI; + +Connection Connection::open(const std::string& url, const Variant::Map& options) +{ + //only support amqp 0-10 at present + Connection connection(new qpid::client::amqp0_10::ConnectionImpl(url, options)); + return connection; +} + +Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); } +Connection::Connection(const Connection& c) : qpid::client::Handle<ConnectionImpl>() { PI::copy(*this, c); } +Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); } +Connection::~Connection() { PI::dtor(*this); } + +void Connection::close() { impl->close(); } +Session Connection::newSession() { return impl->newSession(); } + +InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {} + +void parseKeyValuePair(const std::string& in, Variant::Map& out) +{ + std::string::size_type i = in.find('='); + if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) { + throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in)); + } else { + out[in.substr(0, i)] = in.substr(i+1); + } +} + +void parseOptionString(const std::string& in, Variant::Map& out) +{ + std::string::size_type start = 0; + std::string::size_type i = in.find('&'); + while (i != std::string::npos) { + parseKeyValuePair(in.substr(start, i-start), out); + if (i < in.size()) { + start = i+1; + i = in.find('&', start); + } else { + i = std::string::npos; + } + } + parseKeyValuePair(in.substr(start), out); +} + +Variant::Map parseOptionString(const std::string& in) +{ + Variant::Map map; + parseOptionString(in, map); + return map; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h new file mode 100644 index 0000000000..aa9e5b5fbe --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h @@ -0,0 +1,45 @@ +#ifndef QPID_MESSAGING_CONNECTIONIMPL_H +#define QPID_MESSAGING_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/RefCounted.h" + +namespace qpid { +namespace client { +} + +namespace messaging { + +class Session; + +class ConnectionImpl : public virtual qpid::RefCounted +{ + public: + virtual ~ConnectionImpl() {} + virtual void close() = 0; + virtual Session newSession() = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_CONNECTIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Filter.cpp b/qpid/cpp/src/qpid/messaging/Filter.cpp new file mode 100644 index 0000000000..b06cbdb373 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Filter.cpp @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Filter.h" + +namespace qpid { +namespace client { +} + +namespace messaging { + +Filter::Filter(std::string t, std::string pattern) : type(t) { patterns.push_back(pattern); } +Filter::Filter(std::string t, std::string pattern1, std::string pattern2) : type(t) +{ + patterns.push_back(pattern1); + patterns.push_back(pattern2); +} + +const std::string Filter::WILDCARD("WILDCARD"); +const std::string Filter::EXACT_MATCH("EXACT_MATCH"); + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp new file mode 100644 index 0000000000..1d844b3027 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Message.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" + +namespace qpid { +namespace messaging { + +Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {} +Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {} + +Message::Message(const Message& m) : impl(new MessageImpl(m.getBytes())) {} +Message::~Message() { delete impl; } + +Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; } + +void Message::setReplyTo(const Address& d) { impl->setReplyTo(d); } +const Address& Message::getReplyTo() const { return impl->getReplyTo(); } + +void Message::setSubject(const std::string& s) { impl->setSubject(s); } +const std::string& Message::getSubject() const { return impl->getSubject(); } + +void Message::setContentType(const std::string& s) { impl->setContentType(s); } +const std::string& Message::getContentType() const { return impl->getContentType(); } + +const VariantMap& Message::getHeaders() const { return impl->getHeaders(); } +VariantMap& Message::getHeaders() { return impl->getHeaders(); } + +void Message::setBytes(const std::string& c) { impl->setBytes(c); } +void Message::setBytes(const char* chars, size_t count) { impl->setBytes(chars, count); } +const std::string& Message::getBytes() const { return impl->getBytes(); } +std::string& Message::getBytes() { return impl->getBytes(); } + +const char* Message::getRawContent() const { return impl->getBytes().data(); } +size_t Message::getContentSize() const { return impl->getBytes().size(); } + +MessageContent& Message::getContent() { return *impl; } +const MessageContent& Message::getContent() const { return *impl; } +void Message::setContent(const std::string& s) { *impl = s; } +void Message::setContent(const Variant::Map& m) { *impl = m; } +void Message::setContent(const Variant::List& l) { *impl = l; } + +void Message::encode(Codec& codec) { impl->encode(codec); } + +void Message::decode(Codec& codec) { impl->decode(codec); } + +std::ostream& operator<<(std::ostream& out, const MessageContent& content) +{ + return content.print(out); +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp new file mode 100644 index 0000000000..5df9218e03 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -0,0 +1,205 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageImpl.h" +#include "qpid/messaging/Message.h" + +namespace qpid { +namespace messaging { + +namespace { +const std::string EMPTY_STRING = ""; +} + +MessageImpl::MessageImpl(const std::string& c) : bytes(c), type(VAR_VOID), internalId(0) {} +MessageImpl::MessageImpl(const char* chars, size_t count) : bytes(chars, count), type(VAR_VOID), internalId(0) {} + +void MessageImpl::setReplyTo(const Address& d) { replyTo = d; } +const Address& MessageImpl::getReplyTo() const { return replyTo; } + +void MessageImpl::setSubject(const std::string& s) { subject = s; } +const std::string& MessageImpl::getSubject() const { return subject; } + +void MessageImpl::setContentType(const std::string& s) { contentType = s; } +const std::string& MessageImpl::getContentType() const { return contentType; } + +const VariantMap& MessageImpl::getHeaders() const { return headers; } +VariantMap& MessageImpl::getHeaders() { return headers; } + +//should these methods be on MessageContent? +void MessageImpl::setBytes(const std::string& c) { clear(); bytes = c; } +void MessageImpl::setBytes(const char* chars, size_t count) { clear(); bytes.assign(chars, count); } +const std::string& MessageImpl::getBytes() const { return bytes; } +std::string& MessageImpl::getBytes() { return bytes; } + + +Variant& MessageImpl::operator[](const std::string& key) { return asMap()[key]; } + +std::ostream& MessageImpl::print(std::ostream& out) const +{ + if (type == VAR_MAP) { + return out << content.asMap(); + } else if (type == VAR_LIST) { + return out << content.asList(); + } else { + return out << bytes; + } +} + +template <class T> MessageContent& MessageImpl::append(T& t) +{ + if (type == VAR_VOID) { + //TODO: this is inefficient, probably want to hold on to the stream object + std::stringstream s; + s << bytes; + s << t; + bytes = s.str(); + } else if (type == VAR_LIST) { + content.asList().push_back(Variant(t)); + } else { + throw InvalidConversion("<< operator only valid on strings and lists"); + } + return *this; +} + +MessageContent& MessageImpl::operator<<(const std::string& v) { return append(v); } +MessageContent& MessageImpl::operator<<(const char* v) { return append(v); } +MessageContent& MessageImpl::operator<<(bool v) { return append(v); } +MessageContent& MessageImpl::operator<<(int8_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(int16_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(int32_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(int64_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(uint8_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(uint16_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(uint32_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(uint64_t v) { return append(v); } +MessageContent& MessageImpl::operator<<(double v) { return append(v); } +MessageContent& MessageImpl::operator<<(float v) { return append(v); } +MessageContent& MessageImpl::operator=(const std::string& s) +{ + type = VAR_VOID; + bytes = s; + return *this; +} +MessageContent& MessageImpl::operator=(const char* c) +{ + type = VAR_VOID; + bytes = c; + return *this; +} +MessageContent& MessageImpl::operator=(const Variant::Map& m) +{ + type = VAR_MAP; + content = m; + return *this; +} + +MessageContent& MessageImpl::operator=(const Variant::List& l) +{ + type = VAR_LIST; + content = l; + return *this; +} + +void MessageImpl::encode(Codec& codec) +{ + if (content.getType() != VAR_VOID) { + bytes = EMPTY_STRING; + codec.encode(content, bytes); + } +} + +void MessageImpl::getEncodedContent(Codec& codec, std::string& out) const +{ + if (content.getType() != VAR_VOID) { + codec.encode(content, out); + } else { + out = bytes; + } +} + +void MessageImpl::decode(Codec& codec) +{ + codec.decode(bytes, content); + if (content.getType() == VAR_MAP) type = VAR_MAP; + else if (content.getType() == VAR_LIST) type = VAR_LIST; + else type = VAR_VOID;//TODO: what if codec set some type other than map or list?? +} + +void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; } +qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; } + +bool MessageImpl::isVoid() const { return type == VAR_VOID; } + +const std::string& MessageImpl::asString() const +{ + if (isVoid()) return getBytes(); + else return content.getString();//will throw an error +} +std::string& MessageImpl::asString() +{ + if (isVoid()) return getBytes(); + else return content.getString();//will throw an error +} + +const char* MessageImpl::asChars() const +{ + if (!isVoid()) throw InvalidConversion("Content is of structured type."); + return bytes.data(); +} +size_t MessageImpl::size() const +{ + return bytes.size(); +} + +const Variant::Map& MessageImpl::asMap() const { return content.asMap(); } +Variant::Map& MessageImpl::asMap() +{ + if (isVoid()) { + content = Variant::Map(); + type = VAR_MAP; + } + return content.asMap(); +} +bool MessageImpl::isMap() const { return type == VAR_MAP; } + +const Variant::List& MessageImpl::asList() const { return content.asList(); } +Variant::List& MessageImpl::asList() +{ + if (isVoid()) { + content = Variant::List(); + type = VAR_LIST; + } + return content.asList(); +} +bool MessageImpl::isList() const { return type == VAR_LIST; } + +void MessageImpl::clear() { bytes = EMPTY_STRING; content.reset(); type = VAR_VOID; } + +MessageImpl& MessageImplAccess::get(Message& msg) +{ + return *msg.impl; +} +const MessageImpl& MessageImplAccess::get(const Message& msg) +{ + return *msg.impl; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h new file mode 100644 index 0000000000..1173e7570a --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -0,0 +1,134 @@ +#ifndef QPID_MESSAGING_MESSAGEIMPL_H +#define QPID_MESSAGING_MESSAGEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Codec.h" +#include "qpid/messaging/MessageContent.h" +#include "qpid/messaging/Variant.h" +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace messaging { + +struct MessageImpl : MessageContent +{ + Address replyTo; + std::string subject; + std::string contentType; + Variant::Map headers; + + std::string bytes; + Variant content;//used only for LIST and MAP + VariantType type;//if LIST, MAP content holds the value; if VOID bytes holds the value + + qpid::framing::SequenceNumber internalId; + + MessageImpl(const std::string& c); + MessageImpl(const char* chars, size_t count); + + void setReplyTo(const Address& d); + const Address& getReplyTo() const; + + void setSubject(const std::string& s); + const std::string& getSubject() const; + + void setContentType(const std::string& s); + const std::string& getContentType() const; + + const Variant::Map& getHeaders() const; + Variant::Map& getHeaders(); + + void setBytes(const std::string& bytes); + void setBytes(const char* chars, size_t count); + const std::string& getBytes() const; + std::string& getBytes(); + + void setInternalId(qpid::framing::SequenceNumber id); + qpid::framing::SequenceNumber getInternalId(); + + bool isVoid() const; + + const std::string& asString() const; + std::string& asString(); + + const char* asChars() const; + size_t size() const; + + const Variant::Map& asMap() const; + Variant::Map& asMap(); + bool isMap() const; + + const Variant::List& asList() const; + Variant::List& asList(); + bool isList() const; + + void clear(); + + void getEncodedContent(Codec& codec, std::string&) const; + void encode(Codec& codec); + void decode(Codec& codec); + + Variant& operator[](const std::string&); + + std::ostream& print(std::ostream& out) const; + + //operator<< for variety of types... + MessageContent& operator<<(const std::string&); + MessageContent& operator<<(const char*); + MessageContent& operator<<(bool); + MessageContent& operator<<(int8_t); + MessageContent& operator<<(int16_t); + MessageContent& operator<<(int32_t); + MessageContent& operator<<(int64_t); + MessageContent& operator<<(uint8_t); + MessageContent& operator<<(uint16_t); + MessageContent& operator<<(uint32_t); + MessageContent& operator<<(uint64_t); + MessageContent& operator<<(double); + MessageContent& operator<<(float); + + //assignment from string, map and list + MessageContent& operator=(const std::string&); + MessageContent& operator=(const char*); + MessageContent& operator=(const Variant::Map&); + MessageContent& operator=(const Variant::List&); + + template <class T> MessageContent& append(T& t); +}; + +class Message; + +/** + * Provides access to the internal MessageImpl for a message which is + * useful when accessing any message state not exposed directly + * through the public API. + */ +struct MessageImplAccess +{ + static MessageImpl& get(Message&); + static const MessageImpl& get(const Message&); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_MESSAGEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp new file mode 100644 index 0000000000..2e8b89d27f --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/ReceiverImpl.h" +#include "qpid/client/PrivateImplRef.h" + +namespace qpid { +namespace client { + +typedef PrivateImplRef<qpid::messaging::Receiver> PI; + +} + +namespace messaging { + +using qpid::client::PI; + +Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); } +Receiver::Receiver(const Receiver& s) : qpid::client::Handle<ReceiverImpl>() { PI::copy(*this, s); } +Receiver::~Receiver() { PI::dtor(*this); } +Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); } +bool Receiver::get(Message& message, qpid::sys::Duration timeout) { return impl->get(message, timeout); } +Message Receiver::get(qpid::sys::Duration timeout) { return impl->get(timeout); } +bool Receiver::fetch(Message& message, qpid::sys::Duration timeout) { return impl->fetch(message, timeout); } +Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeout); } +void Receiver::start() { impl->start(); } +void Receiver::stop() { impl->stop(); } +void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } +void Receiver::cancel() { impl->cancel(); } +void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); } + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h new file mode 100644 index 0000000000..77697b730c --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h @@ -0,0 +1,52 @@ +#ifndef QPID_MESSAGING_RECEIVERIMPL_H +#define QPID_MESSAGING_RECEIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace client { +} + +namespace messaging { + +class Message; +class MessageListener; + +class ReceiverImpl : public virtual qpid::RefCounted +{ + public: + virtual ~ReceiverImpl() {} + virtual bool get(Message& message, qpid::sys::Duration timeout) = 0; + virtual Message get(qpid::sys::Duration timeout) = 0; + virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0; + virtual Message fetch(qpid::sys::Duration timeout) = 0; + virtual void start() = 0; + virtual void stop() = 0; + virtual void setCapacity(uint32_t) = 0; + virtual void cancel() = 0; + virtual void setListener(MessageListener*) = 0; +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_RECEIVERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp new file mode 100644 index 0000000000..8db700b060 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Sender.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/SenderImpl.h" +#include "qpid/client/PrivateImplRef.h" + +namespace qpid { +namespace client { + +typedef PrivateImplRef<qpid::messaging::Sender> PI; + +} + +namespace messaging { + +using qpid::client::PI; + +Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); } +Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); } +Sender::~Sender() { PI::dtor(*this); } +Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); } +void Sender::send(const Message& message) { impl->send(message); } +void Sender::cancel() { impl->cancel(); } + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h new file mode 100644 index 0000000000..77d2cfaeaf --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h @@ -0,0 +1,44 @@ +#ifndef QPID_MESSAGING_SENDERIMPL_H +#define QPID_MESSAGING_SENDERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" + +namespace qpid { +namespace client { +} + +namespace messaging { + +class Message; + +class SenderImpl : public virtual qpid::RefCounted +{ + public: + virtual ~SenderImpl() {} + virtual void send(const Message& message) = 0; + virtual void cancel() = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_SENDERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp new file mode 100644 index 0000000000..284b20dacc --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Session.cpp @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/SessionImpl.h" +#include "qpid/client/PrivateImplRef.h" + +namespace qpid { +namespace client { + +typedef PrivateImplRef<qpid::messaging::Session> PI; + +} + +namespace messaging { + +using qpid::client::PI; + +Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); } +Session::Session(const Session& s) : qpid::client::Handle<SessionImpl>() { PI::copy(*this, s); } +Session::~Session() { PI::dtor(*this); } +Session& Session::operator=(const Session& s) { return PI::assign(*this, s); } +void Session::commit() { impl->commit(); } +void Session::rollback() { impl->rollback(); } +void Session::acknowledge() { impl->acknowledge(); } +void Session::reject(Message& m) { impl->reject(m); } +void Session::close() { impl->close(); } + +Sender Session::createSender(const Address& address, const VariantMap& options) +{ + return impl->createSender(address, options); +} +Receiver Session::createReceiver(const Address& address, const VariantMap& options) +{ + return impl->createReceiver(address, options); +} +Receiver Session::createReceiver(const Address& address, const Filter& filter, const VariantMap& options) +{ + return impl->createReceiver(address, filter, options); +} + +Sender Session::createSender(const std::string& address, const VariantMap& options) +{ + return impl->createSender(Address(address), options); +} +Receiver Session::createReceiver(const std::string& address, const VariantMap& options) +{ + return impl->createReceiver(Address(address), options); +} +Receiver Session::createReceiver(const std::string& address, const Filter& filter, const VariantMap& options) +{ + return impl->createReceiver(Address(address), filter, options); +} + +Address Session::createTempQueue(const std::string& baseName) +{ + return impl->createTempQueue(baseName); +} + +void Session::sync() +{ + impl->sync(); +} + +void Session::flush() +{ + impl->flush(); +} + +bool Session::fetch(Message& message, qpid::sys::Duration timeout) +{ + return impl->fetch(message, timeout); +} + +Message Session::fetch(qpid::sys::Duration timeout) +{ + return impl->fetch(timeout); +} + +bool Session::dispatch(qpid::sys::Duration timeout) +{ + return impl->dispatch(timeout); +} + +void* Session::getLastConfirmedSent() +{ + return impl->getLastConfirmedSent(); +} + +void* Session::getLastConfirmedAcknowledged() +{ + return impl->getLastConfirmedAcknowledged(); +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h new file mode 100644 index 0000000000..9b122a24bc --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h @@ -0,0 +1,65 @@ +#ifndef QPID_MESSAGING_SESSIONIMPL_H +#define QPID_MESSAGING_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" +#include <string> +#include "qpid/messaging/Variant.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace client { +} + +namespace messaging { + +struct Address; +struct Filter; +class Message; +class Sender; +class Receiver; + +class SessionImpl : public virtual qpid::RefCounted +{ + public: + virtual ~SessionImpl() {} + virtual void commit() = 0; + virtual void rollback() = 0; + virtual void acknowledge() = 0; + virtual void reject(Message&) = 0; + virtual void close() = 0; + virtual void sync() = 0; + virtual void flush() = 0; + virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0; + virtual Message fetch(qpid::sys::Duration timeout) = 0; + virtual bool dispatch(qpid::sys::Duration timeout) = 0; + virtual Address createTempQueue(const std::string& baseName) = 0; + virtual Sender createSender(const Address& address, const VariantMap& options) = 0; + virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0; + virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0; + virtual void* getLastConfirmedSent() = 0; + virtual void* getLastConfirmedAcknowledged() = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_SESSIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Variant.cpp b/qpid/cpp/src/qpid/messaging/Variant.cpp new file mode 100644 index 0000000000..4e37134b39 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Variant.cpp @@ -0,0 +1,603 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Variant.h" +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace client { +} + +namespace messaging { + +InvalidConversion::InvalidConversion(const std::string& msg) : Exception(msg) {} + + +namespace { +std::string EMPTY; +} + +class VariantImpl +{ + public: + VariantImpl(); + VariantImpl(bool); + VariantImpl(uint8_t); + VariantImpl(uint16_t); + VariantImpl(uint32_t); + VariantImpl(uint64_t); + VariantImpl(int8_t); + VariantImpl(int16_t); + VariantImpl(int32_t); + VariantImpl(int64_t); + VariantImpl(float); + VariantImpl(double); + VariantImpl(const std::string&); + VariantImpl(const Variant::Map&); + VariantImpl(const Variant::List&); + ~VariantImpl(); + + VariantType getType() const; + + bool asBool() const; + uint8_t asUint8() const; + uint16_t asUint16() const; + uint32_t asUint32() const; + uint64_t asUint64() const; + int8_t asInt8() const; + int16_t asInt16() const; + int32_t asInt32() const; + int64_t asInt64() const; + float asFloat() const; + double asDouble() const; + std::string asString() const; + + const Variant::Map& asMap() const; + Variant::Map& asMap(); + const Variant::List& asList() const; + Variant::List& asList(); + + const std::string& getString() const; + std::string& getString(); + + void setEncoding(const std::string&); + const std::string& getEncoding() const; + + static VariantImpl* create(const Variant&); + private: + const VariantType type; + union { + bool b; + uint8_t ui8; + uint16_t ui16; + uint32_t ui32; + uint64_t ui64; + int8_t i8; + int16_t i16; + int32_t i32; + int64_t i64; + float f; + double d; + void* v;//variable width data + } value; + std::string encoding;//optional encoding for variable length data + + std::string getTypeName(VariantType type) const; + template<class T> T convertFromString() const + { + std::string* s = reinterpret_cast<std::string*>(value.v); + try { + return boost::lexical_cast<T>(*s); + } catch(const boost::bad_lexical_cast&) { + throw InvalidConversion(QPID_MSG("Cannot convert " << *s)); + } + } +}; + + +VariantImpl::VariantImpl() : type(VAR_VOID) { value.i64 = 0; } +VariantImpl::VariantImpl(bool b) : type(VAR_BOOL) { value.b = b; } +VariantImpl::VariantImpl(uint8_t i) : type(VAR_UINT8) { value.ui8 = i; } +VariantImpl::VariantImpl(uint16_t i) : type(VAR_UINT16) { value.ui16 = i; } +VariantImpl::VariantImpl(uint32_t i) : type(VAR_UINT32) { value.ui32 = i; } +VariantImpl::VariantImpl(uint64_t i) : type(VAR_UINT64) { value.ui64 = i; } +VariantImpl::VariantImpl(int8_t i) : type(VAR_INT8) { value.i8 = i; } +VariantImpl::VariantImpl(int16_t i) : type(VAR_INT16) { value.i16 = i; } +VariantImpl::VariantImpl(int32_t i) : type(VAR_INT32) { value.i32 = i; } +VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; } +VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; } +VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; } +VariantImpl::VariantImpl(const std::string& s) : type(VAR_STRING) { value.v = new std::string(s); } +VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.v = new Variant::Map(m); } +VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l); } + +VariantImpl::~VariantImpl() { + switch (type) { + case VAR_STRING: + delete reinterpret_cast<std::string*>(value.v); + break; + case VAR_MAP: + delete reinterpret_cast<Variant::Map*>(value.v); + break; + case VAR_LIST: + delete reinterpret_cast<Variant::List*>(value.v); + break; + default: + break; + } +} + +VariantType VariantImpl::getType() const { return type; } + +namespace { + +bool same_char(char a, char b) +{ + return toupper(a) == toupper(b); +} + +bool caseInsensitiveMatch(const std::string& a, const std::string& b) +{ + return a.size() == b.size() && std::equal(a.begin(), a.end(), b.begin(), &same_char); +} + +const std::string TRUE("True"); +const std::string FALSE("False"); + +bool toBool(const std::string& s) +{ + if (caseInsensitiveMatch(s, TRUE)) return true; + if (caseInsensitiveMatch(s, FALSE)) return false; + try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {} + throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool")); +} + +} + +bool VariantImpl::asBool() const +{ + switch(type) { + case VAR_VOID: return false; + case VAR_BOOL: return value.b; + case VAR_UINT8: return value.ui8; + case VAR_UINT16: return value.ui16; + case VAR_UINT32: return value.ui32; + case VAR_UINT64: return value.ui64; + case VAR_INT8: return value.i8; + case VAR_INT16: return value.i16; + case VAR_INT32: return value.i32; + case VAR_INT64: return value.i64; + case VAR_STRING: return toBool(*reinterpret_cast<std::string*>(value.v)); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_BOOL))); + } +} +uint8_t VariantImpl::asUint8() const +{ + switch(type) { + case VAR_UINT8: return value.ui8; + case VAR_STRING: return convertFromString<uint8_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_UINT8))); + } +} +uint16_t VariantImpl::asUint16() const +{ + switch(type) { + case VAR_UINT8: return value.ui8; + case VAR_UINT16: return value.ui16; + case VAR_STRING: return convertFromString<uint16_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_UINT16))); + } +} +uint32_t VariantImpl::asUint32() const +{ + switch(type) { + case VAR_UINT8: return value.ui8; + case VAR_UINT16: return value.ui16; + case VAR_UINT32: return value.ui32; + case VAR_STRING: return convertFromString<uint32_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_UINT32))); + } +} +uint64_t VariantImpl::asUint64() const +{ + switch(type) { + case VAR_UINT8: return value.ui8; + case VAR_UINT16: return value.ui16; + case VAR_UINT32: return value.ui32; + case VAR_UINT64: return value.ui64; + case VAR_STRING: return convertFromString<uint64_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_UINT64))); + } +} +int8_t VariantImpl::asInt8() const +{ + switch(type) { + case VAR_INT8: return value.i8; + case VAR_STRING: return convertFromString<int8_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_INT8))); + } +} +int16_t VariantImpl::asInt16() const +{ + switch(type) { + case VAR_INT8: return value.i8; + case VAR_INT16: return value.i16; + case VAR_STRING: return convertFromString<int16_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_INT16))); + } +} +int32_t VariantImpl::asInt32() const +{ + switch(type) { + case VAR_INT8: return value.i8; + case VAR_INT16: return value.i16; + case VAR_INT32: return value.i32; + case VAR_STRING: return convertFromString<int32_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_INT32))); + } +} +int64_t VariantImpl::asInt64() const +{ + switch(type) { + case VAR_INT8: return value.i8; + case VAR_INT16: return value.i16; + case VAR_INT32: return value.i32; + case VAR_INT64: return value.i64; + case VAR_STRING: return convertFromString<int64_t>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_INT64))); + } +} +float VariantImpl::asFloat() const +{ + switch(type) { + case VAR_FLOAT: return value.f; + case VAR_STRING: return convertFromString<float>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_FLOAT))); + } +} +double VariantImpl::asDouble() const +{ + switch(type) { + case VAR_FLOAT: return value.f; + case VAR_DOUBLE: return value.d; + case VAR_STRING: return convertFromString<double>(); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_DOUBLE))); + } +} +std::string VariantImpl::asString() const +{ + switch(type) { + case VAR_VOID: return EMPTY; + case VAR_BOOL: return value.b ? TRUE : FALSE; + case VAR_UINT8: return boost::lexical_cast<std::string>((int) value.ui8); + case VAR_UINT16: return boost::lexical_cast<std::string>(value.ui16); + case VAR_UINT32: return boost::lexical_cast<std::string>(value.ui32); + case VAR_UINT64: return boost::lexical_cast<std::string>(value.ui64); + case VAR_INT8: return boost::lexical_cast<std::string>((int) value.i8); + case VAR_INT16: return boost::lexical_cast<std::string>(value.i16); + case VAR_INT32: return boost::lexical_cast<std::string>(value.i32); + case VAR_INT64: return boost::lexical_cast<std::string>(value.i64); + case VAR_DOUBLE: return boost::lexical_cast<std::string>(value.d); + case VAR_FLOAT: return boost::lexical_cast<std::string>(value.f); + case VAR_STRING: return *reinterpret_cast<std::string*>(value.v); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_STRING))); + } +} + +const Variant::Map& VariantImpl::asMap() const +{ + switch(type) { + case VAR_MAP: return *reinterpret_cast<Variant::Map*>(value.v); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_MAP))); + } +} + +Variant::Map& VariantImpl::asMap() +{ + switch(type) { + case VAR_MAP: return *reinterpret_cast<Variant::Map*>(value.v); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_MAP))); + } +} + +const Variant::List& VariantImpl::asList() const +{ + switch(type) { + case VAR_LIST: return *reinterpret_cast<Variant::List*>(value.v); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_LIST))); + } +} + +Variant::List& VariantImpl::asList() +{ + switch(type) { + case VAR_LIST: return *reinterpret_cast<Variant::List*>(value.v); + default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(VAR_LIST))); + } +} + +std::string& VariantImpl::getString() +{ + switch(type) { + case VAR_STRING: return *reinterpret_cast<std::string*>(value.v); + default: throw InvalidConversion(QPID_MSG("Variant is not a string; use asString() if conversion is required.")); + } +} + +const std::string& VariantImpl::getString() const +{ + switch(type) { + case VAR_STRING: return *reinterpret_cast<std::string*>(value.v); + default: throw InvalidConversion(QPID_MSG("Variant is not a string; use asString() if conversion is required.")); + } +} + +void VariantImpl::setEncoding(const std::string& s) { encoding = s; } +const std::string& VariantImpl::getEncoding() const { return encoding; } + +std::string VariantImpl::getTypeName(VariantType type) const +{ + switch (type) { + case VAR_VOID: return "void"; + case VAR_BOOL: return "bool"; + case VAR_UINT8: return "uint8"; + case VAR_UINT16: return "uint16"; + case VAR_UINT32: return "uint32"; + case VAR_UINT64: return "uint64"; + case VAR_INT8: return "int8"; + case VAR_INT16: return "int16"; + case VAR_INT32: return "int32"; + case VAR_INT64: return "int64"; + case VAR_FLOAT: return "float"; + case VAR_DOUBLE: return "double"; + case VAR_STRING: return "string"; + case VAR_MAP: return "map"; + case VAR_LIST: return "list"; + } + return "<unknown>";//should never happen +} + +VariantImpl* VariantImpl::create(const Variant& v) +{ + switch (v.getType()) { + case VAR_BOOL: return new VariantImpl(v.asBool()); + case VAR_UINT8: return new VariantImpl(v.asUint8()); + case VAR_UINT16: return new VariantImpl(v.asUint16()); + case VAR_UINT32: return new VariantImpl(v.asUint32()); + case VAR_UINT64: return new VariantImpl(v.asUint64()); + case VAR_INT8: return new VariantImpl(v.asInt8()); + case VAR_INT16: return new VariantImpl(v.asInt16()); + case VAR_INT32: return new VariantImpl(v.asInt32()); + case VAR_INT64: return new VariantImpl(v.asInt64()); + case VAR_FLOAT: return new VariantImpl(v.asFloat()); + case VAR_DOUBLE: return new VariantImpl(v.asDouble()); + case VAR_STRING: return new VariantImpl(v.asString()); + case VAR_MAP: return new VariantImpl(v.asMap()); + case VAR_LIST: return new VariantImpl(v.asList()); + default: return new VariantImpl(); + } +} + +Variant::Variant() : impl(new VariantImpl()) {} +Variant::Variant(bool b) : impl(new VariantImpl(b)) {} +Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(uint32_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(uint64_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(int8_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(int16_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(int32_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(int64_t i) : impl(new VariantImpl(i)) {} +Variant::Variant(float f) : impl(new VariantImpl(f)) {} +Variant::Variant(double d) : impl(new VariantImpl(d)) {} +Variant::Variant(const std::string& s) : impl(new VariantImpl(s)) {} +Variant::Variant(const char* s) : impl(new VariantImpl(std::string(s))) {} +Variant::Variant(const Map& m) : impl(new VariantImpl(m)) {} +Variant::Variant(const List& l) : impl(new VariantImpl(l)) {} +Variant::Variant(const Variant& v) : impl(VariantImpl::create(v)) {} + +Variant::~Variant() { if (impl) delete impl; } + +void Variant::reset() +{ + if (impl) delete impl; + impl = new VariantImpl(); +} + + +Variant& Variant::operator=(bool b) +{ + if (impl) delete impl; + impl = new VariantImpl(b); + return *this; +} + +Variant& Variant::operator=(uint8_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} +Variant& Variant::operator=(uint16_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} +Variant& Variant::operator=(uint32_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} +Variant& Variant::operator=(uint64_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} + +Variant& Variant::operator=(int8_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} +Variant& Variant::operator=(int16_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} +Variant& Variant::operator=(int32_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} +Variant& Variant::operator=(int64_t i) +{ + if (impl) delete impl; + impl = new VariantImpl(i); + return *this; +} + +Variant& Variant::operator=(float f) +{ + if (impl) delete impl; + impl = new VariantImpl(f); + return *this; +} +Variant& Variant::operator=(double d) +{ + if (impl) delete impl; + impl = new VariantImpl(d); + return *this; +} + +Variant& Variant::operator=(const std::string& s) +{ + if (impl) delete impl; + impl = new VariantImpl(s); + return *this; +} + +Variant& Variant::operator=(const char* s) +{ + if (impl) delete impl; + impl = new VariantImpl(std::string(s)); + return *this; +} + +Variant& Variant::operator=(const Map& m) +{ + if (impl) delete impl; + impl = new VariantImpl(m); + return *this; +} + +Variant& Variant::operator=(const List& l) +{ + if (impl) delete impl; + impl = new VariantImpl(l); + return *this; +} + +Variant& Variant::operator=(const Variant& v) +{ + if (impl) delete impl; + impl = VariantImpl::create(v); + return *this; +} + +VariantType Variant::getType() const { return impl->getType(); } +bool Variant::asBool() const { return impl->asBool(); } +uint8_t Variant::asUint8() const { return impl->asUint8(); } +uint16_t Variant::asUint16() const { return impl->asUint16(); } +uint32_t Variant::asUint32() const { return impl->asUint32(); } +uint64_t Variant::asUint64() const { return impl->asUint64(); } +int8_t Variant::asInt8() const { return impl->asInt8(); } +int16_t Variant::asInt16() const { return impl->asInt16(); } +int32_t Variant::asInt32() const { return impl->asInt32(); } +int64_t Variant::asInt64() const { return impl->asInt64(); } +float Variant::asFloat() const { return impl->asFloat(); } +double Variant::asDouble() const { return impl->asDouble(); } +std::string Variant::asString() const { return impl->asString(); } +const Variant::Map& Variant::asMap() const { return impl->asMap(); } +Variant::Map& Variant::asMap() { return impl->asMap(); } +const Variant::List& Variant::asList() const { return impl->asList(); } +Variant::List& Variant::asList() { return impl->asList(); } +const std::string& Variant::getString() const { return impl->getString(); } +std::string& Variant::getString() { return impl->getString(); } +void Variant::setEncoding(const std::string& s) { impl->setEncoding(s); } +const std::string& Variant::getEncoding() const { return impl->getEncoding(); } + +Variant::operator bool() const { return asBool(); } +Variant::operator uint8_t() const { return asUint8(); } +Variant::operator uint16_t() const { return asUint16(); } +Variant::operator uint32_t() const { return asUint32(); } +Variant::operator uint64_t() const { return asUint64(); } +Variant::operator int8_t() const { return asInt8(); } +Variant::operator int16_t() const { return asInt16(); } +Variant::operator int32_t() const { return asInt32(); } +Variant::operator int64_t() const { return asInt64(); } +Variant::operator float() const { return asFloat(); } +Variant::operator double() const { return asDouble(); } +Variant::operator const char*() const { return asString().c_str(); } + +std::ostream& operator<<(std::ostream& out, const Variant::Map& map) +{ + for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + if (i != map.begin()) out << ", "; + out << i->first << ":" << i->second; + } + return out; +} + +std::ostream& operator<<(std::ostream& out, const Variant::List& list) +{ + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + if (i != list.begin()) out << ", "; + out << *i; + } + return out; +} + +std::ostream& operator<<(std::ostream& out, const Variant& value) +{ + switch (value.getType()) { + case VAR_MAP: + out << "{" << value.asMap() << "}"; + break; + case VAR_LIST: + out << "[" << value.asList() << "]"; + break; + case VAR_VOID: + out << "<void>"; + break; + default: + out << value.asString(); + break; + } + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp index d569fb3bb7..3d2a900455 100644 --- a/qpid/cpp/src/qpid/sys/Timer.cpp +++ b/qpid/cpp/src/qpid/sys/Timer.cpp @@ -84,6 +84,7 @@ Timer::~Timer() stop(); } +// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary void Timer::run() { Monitor::ScopedLock l(monitor); @@ -111,8 +112,8 @@ void Timer::run() // Warn on callback overrun AbsTime end(AbsTime::now()); Duration overrun(tasks.top()->nextFireTime, end); - bool late = delay > 1 * TIME_MSEC; - bool overran = overrun > 1 * TIME_MSEC; + bool late = delay > 10 * TIME_MSEC; + bool overran = overrun > 2 * TIME_MSEC; if (late) if (overran) { QPID_LOG(warning, diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 5d1ac6d034..8545ebd9cb 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -305,6 +305,13 @@ private: * thread processing this handle. */ volatile bool writePending; + /** + * This records whether we've been reading is flow controlled: + * it's safe as a simple boolean as the only way to be stopped + * is in calls only allowed in the callback context, the only calls + * checking it are also in calls only allowed in callback context. + */ + volatile bool readingStopped; }; AsynchIO::AsynchIO(const Socket& s, @@ -323,7 +330,8 @@ AsynchIO::AsynchIO(const Socket& s, idleCallback(iCb), socket(s), queuedClose(false), - writePending(false) { + writePending(false), + readingStopped(false) { s.setNonblocking(); } @@ -351,8 +359,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; buff->dataCount = 0; + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_back(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::unread(BufferBase* buff) { @@ -361,15 +372,18 @@ void AsynchIO::unread(BufferBase* buff) { memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); buff->dataStart = 0; } + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_front(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::queueWrite(BufferBase* buff) { assert(buff); // If we've already closed the socket then throw the write away if (queuedClose) { - bufferQueue.push_front(buff); + queueReadBuffer(buff); return; } else { writeQueue.push_front(buff); @@ -378,6 +392,7 @@ void AsynchIO::queueWrite(BufferBase* buff) { DispatchHandle::rewatchWrite(); } +// This can happen outside the callback context void AsynchIO::notifyPendingWrite() { writePending = true; DispatchHandle::rewatchWrite(); @@ -392,11 +407,14 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.empty(); } +// This can happen outside the callback context void AsynchIO::startReading() { + readingStopped = false; DispatchHandle::rewatchRead(); } void AsynchIO::stopReading() { + readingStopped = true; DispatchHandle::unwatchRead(); } diff --git a/qpid/cpp/src/qpid/sys/uuid.h b/qpid/cpp/src/qpid/sys/uuid.h new file mode 100644 index 0000000000..804ab34463 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/uuid.h @@ -0,0 +1,28 @@ +#ifndef _sys_uuid_h +#define _sys_uuid_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifdef _WIN32 +# include "qpid/sys/windows/uuid.h" +#else +# include <uuid/uuid.h> +#endif /* _WIN32 */ + +#endif /* _sys_uuid_h */ diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.h b/qpid/cpp/src/qpid/sys/windows/uuid.h new file mode 100644 index 0000000000..c79abe95c6 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/uuid.h @@ -0,0 +1,38 @@ +#ifndef _sys_windows_uuid_h +#define _sys_windows_uuid_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonImportExport.h" +#include <qpid/sys/IntegerTypes.h> + +namespace qpid { namespace sys { const size_t UuidSize = 16; }} +typedef uint8_t uuid_t[qpid::sys::UuidSize]; + +QPID_COMMON_EXTERN void uuid_clear (uuid_t uu); +QPID_COMMON_EXTERN void uuid_copy (uuid_t dst, const uuid_t src); +QPID_COMMON_EXTERN void uuid_generate (uuid_t out); +QPID_COMMON_EXTERN int uuid_is_null (const uuid_t uu); // Returns 1 if null, else 0 +QPID_COMMON_EXTERN int uuid_parse (const char *in, uuid_t uu); // Returns 0 on success, else -1 +QPID_COMMON_EXTERN void uuid_unparse (const uuid_t uu, char *out); + +#endif /*!_sys_windows_uuid_h*/ |