/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Session.h" #include "Incoming.h" #include "Outgoing.h" #include "Message.h" #include "Connection.h" #include "Domain.h" #include "Exception.h" #include "Interconnects.h" #include "Relay.h" #include "Topic.h" #include "qpid/amqp/descriptors.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueCursor.h" #include "qpid/broker/Selector.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/amqp/Filter.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" #include #include #include #include extern "C" { #include } namespace qpid { namespace broker { namespace amqp { namespace { pn_bytes_t convert(const std::string& s) { pn_bytes_t result; result.start = const_cast(s.data()); result.size = s.size(); return result; } std::string convert(pn_bytes_t in) { return std::string(in.start, in.size); } //capabilities const std::string CREATE_ON_DEMAND("create-on-demand"); const std::string DURABLE("durable"); const std::string QUEUE("queue"); const std::string TOPIC("topic"); const std::string DIRECT_FILTER("legacy-amqp-direct-binding"); const std::string TOPIC_FILTER("legacy-amqp-topic-binding"); const std::string SHARED("shared"); void writeCapabilities(pn_data_t* out, const std::vector& supported) { if (supported.size() == 1) { pn_data_put_symbol(out, convert(supported.front())); } else if (supported.size() > 1) { pn_data_put_array(out, false, PN_SYMBOL); pn_data_enter(out); for (std::vector::const_iterator i = supported.begin(); i != supported.end(); ++i) { pn_data_put_symbol(out, convert(*i)); } pn_data_exit(out); } } template void readCapabilities(pn_data_t* data, F f) { pn_data_rewind(data); if (pn_data_next(data)) { pn_type_t type = pn_data_type(data); if (type == PN_ARRAY) { pn_data_enter(data); while (pn_data_next(data)) { std::string s = convert(pn_data_get_symbol(data)); f(s); } pn_data_exit(data); } else if (type == PN_SYMBOL) { std::string s = convert(pn_data_get_symbol(data)); f(s); } else { QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type)); } } } void matchCapability(const std::string& name, bool* result, const std::string& s) { if (s == name) *result = true; } bool is_capability_requested(const std::string& name, pn_data_t* capabilities) { bool result(false); readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1)); return result; } void collectQueueCapabilities(boost::shared_ptr node, std::vector* supported, const std::string& s) { if (s == DURABLE) { if (node->isDurable()) supported->push_back(s); } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) { supported->push_back(s); } } void collectExchangeCapabilities(boost::shared_ptr node, std::vector* supported, const std::string& s) { if (s == DURABLE) { if (node->isDurable()) supported->push_back(s); } else if (s == SHARED) { supported->push_back(s); } else if (s == CREATE_ON_DEMAND || s == TOPIC) { supported->push_back(s); } else if (s == DIRECT_FILTER) { if (node->getType() == DirectExchange::typeName) supported->push_back(s); } else if (s == TOPIC_FILTER) { if (node->getType() == TopicExchange::typeName) supported->push_back(s); } } void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr node) { std::vector supported; readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1)); writeCapabilities(out, supported); } void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr node) { std::vector supported; readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1)); writeCapabilities(out, supported); } } class IncomingToQueue : public DecodingIncoming { public: IncomingToQueue(Broker& b, Session& p, boost::shared_ptr q, pn_link_t* l, const std::string& source, bool icl) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q), isControllingLink(icl) { queue->markInUse(isControllingLink); } ~IncomingToQueue() { queue->releaseFromUse(isControllingLink); } void handle(qpid::broker::Message& m); private: boost::shared_ptr queue; bool isControllingLink; }; class IncomingToExchange : public DecodingIncoming { public: IncomingToExchange(Broker& b, Session& p, boost::shared_ptr e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr exchange; Authorise& authorise; }; Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()) {} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) { ResolvedNode node; node.exchange = connection.getBroker().getExchanges().find(name); node.queue = connection.getBroker().getQueues().find(name); node.topic = connection.getTopics().get(name); bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus)); //Strictly speaking, properties should only be specified when the //terminus is dynamic. However we will not enforce that here. If //properties are set on the attach request, we will set them on //our reply. This allows the 'create' and 'assert' options in the //qpid messaging API to be implemented over 1.0. node.properties.read(pn_terminus_properties(terminus)); if (node.topic) node.exchange = node.topic->getExchange(); if (node.exchange && !node.queue && createOnDemand) { if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) { //emulate 0-10 exchange-declare behaviour throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists"); } } if (!node.queue && !node.exchange) { if (pn_terminus_is_dynamic(terminus) || createOnDemand) { //is it a queue or an exchange? if (node.properties.isQueue()) { node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; } else { qpid::framing::FieldTable args; qpid::amqp_0_10::translate(node.properties.getProperties(), args); node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), args, connection.getUserId(), connection.getId()).first; } } else { size_t i = name.find('@'); if (i != std::string::npos && (i+1) < name.length()) { std::string domain = name.substr(i+1); std::string local = name.substr(0, i); std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str(); //does this domain exist? boost::shared_ptr d = connection.getInterconnects().findDomain(domain); if (d) { node.relay = boost::shared_ptr(new Relay(1000)); if (incoming) { d->connect(false, id, name, local, connection, node.relay); } else { d->connect(true, id, local, name, connection, node.relay); } } } } } else if (node.queue && node.topic) { QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic"); node.queue.reset(); } else if (node.queue && node.exchange) { QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); node.exchange.reset(); } if (node.properties.isExclusive() && node.queue && node.queue->setExclusiveOwner(this)) { exclusiveQueues.insert(node.queue); } return node; } std::string Session::generateName(pn_link_t* link) { std::stringstream s; if (connection.getContainerId().empty()) { s << qpid::types::Uuid(true); } else { s << connection.getContainerId(); } s << "_" << pn_link_name(link); return s.str(); } std::string Session::qualifyName(const std::string& name) { if (connection.getDomain().empty()) { return name; } else { std::stringstream s; s << name << "@" << connection.getDomain(); return s.str(); } } void Session::attach(pn_link_t* link) { if (pn_link_is_sender(link)) { pn_terminus_t* source = pn_link_remote_source(link); //i.e a subscription std::string name; if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No source specified!"); } else if (pn_terminus_is_dynamic(source)) { name = generateName(link); QPID_LOG(debug, "Received attach request for outgoing link from " << name); pn_terminus_set_address(pn_link_source(link), qualifyName(name).c_str()); } else { name = pn_terminus_get_address(source); QPID_LOG(debug, "Received attach request for outgoing link from " << name); pn_terminus_set_address(pn_link_source(link), name.c_str()); } setupOutgoing(link, source, name); } else { pn_terminus_t* target = pn_link_remote_target(link); std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); } else if (pn_terminus_is_dynamic(target)) { name = generateName(link); QPID_LOG(debug, "Received attach request for incoming link to " << name); pn_terminus_set_address(pn_link_target(link), qualifyName(name).c_str()); } else { name = pn_terminus_get_address(target); QPID_LOG(debug, "Received attach request for incoming link to " << name); pn_terminus_set_address(pn_link_target(link), name.c_str()); } setupIncoming(link, target, name); } } void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name) { ResolvedNode node = resolve(name, target, true); //set capabilities if (node.queue) { setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue); authorise.incoming(node.queue); node.properties.write(pn_terminus_properties(pn_link_target(link)), node.queue); } else if (node.exchange) { setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange); authorise.incoming(node.exchange); node.properties.write(pn_terminus_properties(pn_link_target(link)), node.exchange); } const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); if (!sourceAddress) { sourceAddress = pn_terminus_get_address(pn_link_source(link)); } std::string source; if (sourceAddress) { source = sourceAddress; } if (node.queue) { boost::shared_ptr q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink())); incoming[link] = q; } else if (node.exchange) { boost::shared_ptr e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source)); incoming[link] = e; } else if (node.relay) { boost::shared_ptr in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay)); incoming[link] = in; } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name); } if (connection.getBroker().getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm); QPID_LOG(debug, "Incoming link attached"); } void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name) { ResolvedNode node = resolve(name, source, false); if (node.queue) { setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue); node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue); } else if (node.exchange) { setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange); node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange); } Filter filter; filter.read(pn_terminus_filter(source)); const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link)); if (!targetAddress) { targetAddress = pn_terminus_get_address(pn_link_target(link)); } std::string target; if (targetAddress) { target = targetAddress; } if (node.queue) { authorise.outgoing(node.queue); SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER; boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink())); q->init(); filter.apply(q); outgoing[link] = q; } else if (node.exchange) { authorise.access(node.exchange);//do separate access check before trying to create the queue bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source)); bool durable = pn_terminus_get_durability(source); bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) != PN_SND_UNSETTLED; QueueSettings settings(durable, autodelete); std::string altExchange; if (node.topic) { settings = node.topic->getPolicy(); settings.durable = durable; settings.autodelete = autodelete; altExchange = node.topic->getAlternateExchange(); } settings.autoDeleteDelay = pn_terminus_get_timeout(source); if (settings.autoDeleteDelay) { settings.autodelete = true; settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; } filter.configure(settings); std::stringstream queueName; if (shared) { //just use link name (TODO: could allow this to be //overridden when access to link properties is provided //(PROTON-335)) queueName << pn_link_name(link); } else { //combination of container id and link name is unique queueName << connection.getContainerId() << "_" << pn_link_name(link); } boost::shared_ptr queue = connection.getBroker().createQueue(queueName.str(), settings, this, altExchange, connection.getUserId(), connection.getId()).first; if (!shared) queue->setExclusiveOwner(this); authorise.outgoing(node.exchange, queue, filter); filter.bind(node.exchange, queue); boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false)); q->init(); outgoing[link] = q; } else if (node.relay) { boost::shared_ptr out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay)); outgoing[link] = out; out->init(); } else { pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);/*not-found*/ } filter.write(pn_terminus_filter(pn_link_source(link))); QPID_LOG(debug, "Outgoing link attached"); } /** * Called for links initiated by the broker */ void Session::attach(pn_link_t* link, const std::string& src, const std::string& tgt, boost::shared_ptr relay) { pn_terminus_t* source = pn_link_source(link); pn_terminus_t* target = pn_link_target(link); pn_terminus_set_address(source, src.c_str()); pn_terminus_set_address(target, tgt.c_str()); if (relay) { if (pn_link_is_sender(link)) { boost::shared_ptr out(new OutgoingFromRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay)); outgoing[link] = out; out->init(); } else { boost::shared_ptr in(new IncomingToRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay)); incoming[link] = in; } } else { if (pn_link_is_sender(link)) { setupOutgoing(link, source, src); } else { setupIncoming(link, target, tgt); } } } void Session::detach(pn_link_t* link) { if (pn_link_is_sender(link)) { OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { i->second->detached(); boost::shared_ptr q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); if (q) connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); outgoing.erase(i); QPID_LOG(debug, "Outgoing link detached"); } } else { IncomingLinks::iterator i = incoming.find(link); if (i != incoming.end()) { i->second->detached(); incoming.erase(i); QPID_LOG(debug, "Incoming link detached"); } } } void Session::accepted(pn_delivery_t* delivery, bool sync) { if (sync) { //this is on IO thread pn_delivery_update(delivery, PN_ACCEPTED); pn_delivery_settle(delivery);//do we need to check settlement modes/orders? incomingMessageAccepted(); } else { //this is not on IO thread, need to delay processing until on IO thread qpid::sys::Mutex::ScopedLock l(lock); if (!deleted) { completed.push_back(delivery); out.activateOutput(); } } } void Session::readable(pn_link_t* link, pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); incomingMessageReceived(); IncomingLinks::iterator target = incoming.find(link); if (target == incoming.end()) { QPID_LOG(error, "Received message on unknown link"); pn_delivery_update(delivery, PN_REJECTED); pn_delivery_settle(delivery);//do we need to check settlement modes/orders? incomingMessageRejected(); } else { target->second->readable(delivery); if (target->second->haveWork()) out.activateOutput(); } } void Session::writable(pn_link_t* link, pn_delivery_t* delivery) { OutgoingLinks::iterator sender = outgoing.find(link); if (sender == outgoing.end()) { QPID_LOG(error, "Delivery returned for unknown link"); } else { sender->second->handle(delivery); } } bool Session::dispatch() { bool output(false); for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { try { if (s->second->doWork()) output = true; ++s; } catch (const Exception& e) { pn_condition_t* error = pn_link_condition(s->first); pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(s->first); s->second->detached(); outgoing.erase(s++); output = true; } } if (completed.size()) { output = true; std::deque copy; { qpid::sys::Mutex::ScopedLock l(lock); completed.swap(copy); } for (std::deque::iterator i = copy.begin(); i != copy.end(); ++i) { accepted(*i, true); } } for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end();) { try { if (i->second->doWork()) output = true; ++i; } catch (const Exception& e) { pn_condition_t* error = pn_link_condition(i->first); pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(i->first); i->second->detached(); incoming.erase(i++); output = true; } } return output; } void Session::close() { for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { i->second->detached(); } for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { i->second->detached(); } outgoing.clear(); incoming.clear(); QPID_LOG(debug, "Session " << session << " closed, all links detached."); for (std::set< boost::shared_ptr >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) { (*i)->releaseExclusiveOwnership(); } exclusiveQueues.clear(); qpid::sys::Mutex::ScopedLock l(lock); deleted = true; } void Session::wakeup() { out.activateOutput(); } Authorise& Session::getAuthorise() { return authorise; } void IncomingToQueue::handle(qpid::broker::Message& message) { if (queue->isDeleted()) { std::stringstream msg; msg << " Queue " << queue->getName() << " has been deleted"; throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str()); } queue->deliver(message); } void IncomingToExchange::handle(qpid::broker::Message& message) { authorise.route(exchange, message); DeliverableMessage deliverable(message, 0); exchange->route(deliverable); if (!deliverable.delivered) { if (exchange->getAlternate()) { exchange->getAlternate()->route(deliverable); } } } }}} // namespace qpid::broker::amqp