diff options
Diffstat (limited to 'cpp/src/qpid/broker/amqp')
27 files changed, 3403 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp new file mode 100644 index 0000000000..1f135cf931 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Connection.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Connection.h" +#include "Session.h" +#include "qpid/Exception.h" +#include "qpid/broker/Broker.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/OutputControl.h" +#include <sstream> +extern "C" { +#include <proton/engine.h> +#include <proton/error.h> +} + +namespace qpid { +namespace broker { +namespace amqp { + +Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, bool saslInUse) + : ManagedConnection(b, i), + connection(pn_connection()), + transport(pn_transport()), + out(o), id(i), broker(b), haveOutput(true) +{ + if (pn_transport_bind(transport, connection)) { + //error + } + out.activateOutput(); + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM); + + if (!saslInUse) { + //feed in a dummy AMQP 1.0 header as engine expects one, but + //we already read it (if sasl is in use we read the sasl + //header,not the AMQP 1.0 header). + std::vector<char> protocolHeader(8); + qpid::framing::ProtocolInitiation pi(getVersion()); + qpid::framing::Buffer buffer(&protocolHeader[0], protocolHeader.size()); + pi.encode(buffer); + pn_transport_input(transport, &protocolHeader[0], protocolHeader.size()); + + //wont get a userid, so set a dummy one on the ManagedConnection to trigger event + setUserid("no authentication used"); + } +} + + +Connection::~Connection() +{ + + pn_transport_free(transport); + pn_connection_free(connection); +} + +pn_transport_t* Connection::getTransport() +{ + return transport; +} +size_t Connection::decode(const char* buffer, size_t size) +{ + QPID_LOG(trace, id << " decode(" << size << ")") + //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size); + if (n > 0 || n == PN_EOS) { + //If engine returns EOS, have no way of knowing how many bytes + //it processed, but can assume none need to be reprocessed so + //consider them all read: + if (n == PN_EOS) n = size; + QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) + process(); + pn_transport_tick(transport, 0); + if (!haveOutput) { + haveOutput = true; + out.activateOutput(); + } + return n; + } else if (n == PN_ERR) { + throw qpid::Exception(QPID_MSG("Error on input: " << getError())); + } else { + return 0; + } +} + +size_t Connection::encode(char* buffer, size_t size) +{ + QPID_LOG(trace, "encode(" << size << ")") + ssize_t n = pn_transport_output(transport, buffer, size); + if (n > 0) { + QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) + haveOutput = true; + return n; + } else if (n == PN_EOS) { + haveOutput = size; + return size;//Is this right? + } else if (n == PN_ERR) { + throw qpid::Exception(QPID_MSG("Error on output: " << getError())); + } else { + haveOutput = false; + return 0; + } +} +bool Connection::canEncode() +{ + for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { + if (i->second->dispatch()) haveOutput = true; + } + process(); + //TODO: proper handling of time in and out of tick + pn_transport_tick(transport, 0); + QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) + return haveOutput; +} +void Connection::closed() +{ + //TODO: tear down sessions and associated links + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + i->second->close(); + } +} +bool Connection::isClosed() const +{ + return pn_connection_state(connection) & PN_REMOTE_CLOSED; +} +framing::ProtocolVersion Connection::getVersion() const +{ + return qpid::framing::ProtocolVersion(1,0); +} +namespace { +pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; +pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; +} + +void Connection::process() +{ + QPID_LOG(trace, id << " process()"); + if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { + QPID_LOG_CAT(debug, model, id << " connection opened"); + pn_connection_set_container(connection, broker.getFederationTag().c_str()); + pn_connection_open(connection); + } + + for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { + QPID_LOG_CAT(debug, model, id << " session begun"); + pn_session_open(s); + boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out)); + sessions[s] = ssn; + } + for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) { + pn_link_open(l); + + Sessions::iterator session = sessions.find(pn_link_session(l)); + if (session == sessions.end()) { + QPID_LOG(error, id << " Link attached on unknown session!"); + } else { + try { + session->second->attach(l); + QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l)); + } catch (const std::exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + //TODO: set error details on detach when that is exposed via engine API + pn_link_close(l); + } + } + } + + //handle deliveries + for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { + pn_link_t* link = pn_delivery_link(delivery); + if (pn_link_is_receiver(link)) { + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + i->second->incoming(link, delivery); + } else { + pn_delivery_update(delivery, PN_REJECTED); + } + } else { //i.e. SENDER + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); + i->second->outgoing(link, delivery); + } else { + QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); + } + } + } + + + for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { + pn_link_close(l); + Sessions::iterator session = sessions.find(pn_link_session(l)); + if (session == sessions.end()) { + QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); + } else { + session->second->detach(l); + QPID_LOG_CAT(debug, model, id << " link detached"); + } + } + for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { + pn_session_close(s); + Sessions::iterator i = sessions.find(s); + if (i != sessions.end()) { + i->second->close(); + sessions.erase(i); + QPID_LOG_CAT(debug, model, id << " session ended"); + } else { + QPID_LOG(error, id << " peer attempted to close unrecognised session"); + } + } + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); + } +} + +std::string Connection::getError() +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror); + pn_error_t* terror = pn_transport_error(transport); + if (terror) text << "transport error " << pn_error_text(terror); + return text.str(); +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Connection.h b/cpp/src/qpid/broker/amqp/Connection.h new file mode 100644 index 0000000000..8af209af7a --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Connection.h @@ -0,0 +1,73 @@ +#ifndef QPID_BROKER_AMQP1_CONNECTION_H +#define QPID_BROKER_AMQP1_CONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/broker/amqp/ManagedConnection.h" +#include <map> +#include <boost/shared_ptr.hpp> + +struct pn_connection_t; +struct pn_session_t; +struct pn_transport_t; + +namespace qpid { +namespace broker { + +class Broker; + +namespace amqp { + +class Session; +/** + * AMQP 1.0 protocol support for broker + */ +class Connection : public sys::ConnectionCodec, public ManagedConnection +{ + public: + Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse); + ~Connection(); + size_t decode(const char* buffer, size_t size); + size_t encode(char* buffer, size_t size); + bool canEncode(); + + void closed(); + bool isClosed() const; + + framing::ProtocolVersion getVersion() const; + pn_transport_t* getTransport(); + private: + typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; + pn_connection_t* connection; + pn_transport_t* transport; + qpid::sys::OutputControl& out; + const std::string id; + qpid::broker::Broker& broker; + bool haveOutput; + Sessions sessions; + + void process(); + std::string getError(); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP1_CONNECTION_H*/ diff --git a/cpp/src/qpid/broker/amqp/DataReader.cpp b/cpp/src/qpid/broker/amqp/DataReader.cpp new file mode 100644 index 0000000000..519dd71c9c --- /dev/null +++ b/cpp/src/qpid/broker/amqp/DataReader.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataReader.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/log/Statement.h" +#include <string> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +qpid::amqp::CharSequence convert(pn_bytes_t in) +{ + qpid::amqp::CharSequence out; + out.data = in.start; + out.size = in.size; + return out; +} + +qpid::amqp::CharSequence convert(pn_uuid_t in) +{ + qpid::amqp::CharSequence out; + out.data = in.bytes; + out.size = 16; + return out; +} +} + +DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {} + +void DataReader::read(pn_data_t* data) +{ + /* + while (pn_data_next(data)) { + readOne(data); + } + */ + do { + readOne(data); + } while (pn_data_next(data)); +} +void DataReader::readOne(pn_data_t* data) +{ + qpid::amqp::Descriptor descriptor(0); + bool described = pn_data_is_described(data); + if (described) { + pn_data_enter(data); + pn_data_next(data); + if (pn_data_type(data) == PN_ULONG) { + descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(data)); + } else if (pn_data_type(data) == PN_SYMBOL) { + descriptor = qpid::amqp::Descriptor(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(notice, "Ignoring descriptor of type " << pn_data_type(data)); + } + pn_data_next(data); + } + switch (pn_data_type(data)) { + case PN_NULL: + reader.onNull(described ? &descriptor : 0); + break; + case PN_BOOL: + reader.onBoolean(pn_data_get_bool(data), described ? &descriptor : 0); + break; + case PN_UBYTE: + reader.onUByte(pn_data_get_ubyte(data), described ? &descriptor : 0); + break; + case PN_BYTE: + reader.onByte(pn_data_get_byte(data), described ? &descriptor : 0); + break; + case PN_USHORT: + reader.onUShort(pn_data_get_ushort(data), described ? &descriptor : 0); + break; + case PN_SHORT: + reader.onShort(pn_data_get_short(data), described ? &descriptor : 0); + break; + case PN_UINT: + reader.onUInt(pn_data_get_uint(data), described ? &descriptor : 0); + break; + case PN_INT: + reader.onInt(pn_data_get_int(data), described ? &descriptor : 0); + break; + case PN_CHAR: + pn_data_get_char(data); + break; + case PN_ULONG: + reader.onULong(pn_data_get_ulong(data), described ? &descriptor : 0); + break; + case PN_LONG: + reader.onLong(pn_data_get_long(data), described ? &descriptor : 0); + break; + case PN_TIMESTAMP: + reader.onTimestamp(pn_data_get_timestamp(data), described ? &descriptor : 0); + break; + case PN_FLOAT: + reader.onFloat(pn_data_get_float(data), described ? &descriptor : 0); + break; + case PN_DOUBLE: + reader.onDouble(pn_data_get_double(data), described ? &descriptor : 0); + break; + case PN_DECIMAL32: + pn_data_get_decimal32(data); + break; + case PN_DECIMAL64: + pn_data_get_decimal64(data); + break; + case PN_DECIMAL128: + pn_data_get_decimal128(data); + break; + case PN_UUID: + reader.onUuid(convert(pn_data_get_uuid(data)), described ? &descriptor : 0); + break; + case PN_BINARY: + reader.onBinary(convert(pn_data_get_binary(data)), described ? &descriptor : 0); + break; + case PN_STRING: + reader.onString(convert(pn_data_get_string(data)), described ? &descriptor : 0); + break; + case PN_SYMBOL: + reader.onSymbol(convert(pn_data_get_symbol(data)), described ? &descriptor : 0); + break; + case PN_DESCRIBED: + break; + case PN_ARRAY: + readArray(data, described ? &descriptor : 0); + break; + case PN_LIST: + readList(data, described ? &descriptor : 0); + break; + case PN_MAP: + readMap(data, described ? &descriptor : 0); + break; + } + if (described) pn_data_exit(data); +} + +void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*descriptor*/) +{ + //not yet implemented +} + +void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) +{ + size_t count = pn_data_get_list(data); + reader.onStartList(count, qpid::amqp::CharSequence(), descriptor); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndList(count, descriptor); +} + +void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) +{ + size_t count = pn_data_get_map(data); + reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndMap(count, descriptor); +} +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/DataReader.h b/cpp/src/qpid/broker/amqp/DataReader.h new file mode 100644 index 0000000000..024507e7f2 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/DataReader.h @@ -0,0 +1,53 @@ +#ifndef QPID_BROKER_AMQP_DATAREADER_H +#define QPID_BROKER_AMQP_DATAREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Reader.h" + +struct pn_data_t; + +namespace qpid { +namespace amqp { +struct Descriptor; +} +namespace broker { +namespace amqp { + +/** + * Allows use of Reader interface to read pn_data_t* data. + */ +class DataReader +{ + public: + DataReader(qpid::amqp::Reader& reader); + void read(pn_data_t*); + private: + qpid::amqp::Reader& reader; + + void readOne(pn_data_t*); + void readMap(pn_data_t*, const qpid::amqp::Descriptor*); + void readList(pn_data_t*, const qpid::amqp::Descriptor*); + void readArray(pn_data_t*, const qpid::amqp::Descriptor*); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_DATAREADER_H*/ diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp new file mode 100644 index 0000000000..38baba0df1 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Filter.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Filter.h" +#include "qpid/broker/amqp/DataReader.h" +#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/TopicExchange.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace broker { +namespace amqp { + +void Filter::read(pn_data_t* data) +{ + try { + DataReader reader(*this); + reader.read(data); + } catch (const std::exception& e) { + QPID_LOG(warning, "Error parsing filter: " << e.what()); + } +} + +void Filter::write(pn_data_t* data) +{ + pn_data_put_map(data); + pn_data_enter(data); + subjectFilter.write(data); + pn_data_exit(data); +} + +void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor) +{ + StringFilter filter; + filter.key = std::string(key.data, key.size); + filter.value = std::string(value.data, value.size); + if (descriptor) { + filter.described = true; + filter.descriptor = *descriptor; + if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE) + || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { + setSubjectFilter(filter); + } else { + QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor); + } + } else { + setSubjectFilter(filter); + } +} + +bool Filter::hasSubjectFilter() const +{ + return !subjectFilter.value.empty(); +} + +std::string Filter::getSubjectFilter() const +{ + return subjectFilter.value; +} + + +void Filter::setSubjectFilter(const StringFilter& filter) +{ + if (hasSubjectFilter()) { + QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set"); + } else { + subjectFilter = filter; + } +} + +void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) +{ + subjectFilter.bind(exchange, queue); +} + +Filter::StringFilter::StringFilter() : described(false), descriptor(0) {} +namespace { +pn_bytes_t convert(const std::string& in) +{ + pn_bytes_t out; + out.start = const_cast<char*>(in.data()); + out.size = in.size(); + return out; +} +pn_bytes_t convert(const qpid::amqp::CharSequence& in) +{ + pn_bytes_t out; + out.start = const_cast<char*>(in.data); + out.size = in.size; + return out; +} +} +void Filter::StringFilter::write(pn_data_t* data) +{ + pn_data_put_symbol(data, convert(key)); + if (described) { + pn_data_put_described(data); + pn_data_enter(data); + switch (descriptor.type) { + case qpid::amqp::Descriptor::NUMERIC: + pn_data_put_ulong(data, descriptor.value.code); + break; + case qpid::amqp::Descriptor::SYMBOLIC: + pn_data_put_symbol(data, convert(descriptor.value.symbol)); + break; + } + } + pn_data_put_string(data, convert(value)); + if (described) pn_data_exit(data); +} + +void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) +{ + if (described && exchange->getType() == DirectExchange::typeName + && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) { + QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName()); + if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) { + descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE); + } else { + qpid::amqp::CharSequence symbol; + symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data(); + symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size(); + descriptor = qpid::amqp::Descriptor(symbol); + } + } + exchange->bind(queue, value, 0); +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h new file mode 100644 index 0000000000..20cceb372a --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Filter.h @@ -0,0 +1,63 @@ +#ifndef QPID_BROKER_AMQP_FILTER_H +#define QPID_BROKER_AMQP_FILTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/amqp/Descriptor.h" +#include <boost/shared_ptr.hpp> + +struct pn_data_t; +namespace qpid { +namespace broker { +class Exchange; +class Queue; +namespace amqp { + + +class Filter : qpid::amqp::MapReader +{ + public: + void read(pn_data_t*); + void write(pn_data_t*); + bool hasSubjectFilter() const; + std::string getSubjectFilter() const; + void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); + private: + struct StringFilter + { + bool described; + qpid::amqp::Descriptor descriptor; + std::string key; + std::string value; + StringFilter(); + void write(pn_data_t*); + void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); + }; + + void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor); + void setSubjectFilter(const StringFilter&); + + StringFilter subjectFilter; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_FILTER_H*/ diff --git a/cpp/src/qpid/broker/amqp/Header.cpp b/cpp/src/qpid/broker/amqp/Header.cpp new file mode 100644 index 0000000000..493e757a56 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Header.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Header.h" +#include "qpid/broker/Message.h" + +namespace qpid { +namespace broker { +namespace amqp { + +bool Header::isDurable() const +{ + return message.isPersistent(); +} + +uint8_t Header::getPriority() const +{ + return message.getPriority(); +} + +bool Header::hasTtl() const +{ + uint64_t dummy(0); + return message.getTtl(dummy); +} + +uint32_t Header::getTtl() const +{ + uint64_t ttl(0); + message.getTtl(ttl); + if (ttl > std::numeric_limits<uint32_t>::max()) return std::numeric_limits<uint32_t>::max(); + else return (uint32_t) ttl; +} + +bool Header::isFirstAcquirer() const +{ + return false;//TODO +} + +uint32_t Header::getDeliveryCount() const +{ + return message.getDeliveryCount(); +} + +Header::Header(const qpid::broker::Message& m) : message(m) {} + + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Header.h b/cpp/src/qpid/broker/amqp/Header.h new file mode 100644 index 0000000000..6e4f763028 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Header.h @@ -0,0 +1,50 @@ +#ifndef QPID_BROKER_AMQP_HEADER_H +#define QPID_BROKER_AMQP_HEADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageEncoder.h" + +namespace qpid { +namespace broker { +class Message; +namespace amqp { + +/** + * Adapts the broker current message abstraction to provide that + * required by the AMQP 1.0 message encoder. + */ +class Header : public qpid::amqp::MessageEncoder::Header +{ + public: + Header(const qpid::broker::Message&); + bool isDurable() const; + uint8_t getPriority() const; + bool hasTtl() const; + uint32_t getTtl() const; + bool isFirstAcquirer() const; + uint32_t getDeliveryCount() const; + private: + const qpid::broker::Message& message; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_HEADER_H*/ diff --git a/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/cpp/src/qpid/broker/amqp/ManagedConnection.cpp new file mode 100644 index 0000000000..0253ba5552 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/ManagedConnection.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/log/Statement.h" +#include "qmf/org/apache/qpid/broker/EventClientConnect.h" +#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { + +ManagedConnection::ManagedConnection(Broker& broker, const std::string i) : id(i), agent(0) +{ + //management integration: + agent = broker.getManagementAgent(); + if (agent != 0) { + qpid::management::Manageable* parent = broker.GetVhostObject(); + // TODO set last bool true if system connection + connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, id, true, false, "AMQP 1.0")); + connection->set_shadow(false); + agent->addObject(connection); + } +} + +ManagedConnection::~ManagedConnection() +{ + if (agent && connection) { + agent->raiseEvent(_qmf::EventClientDisconnect(id, userid, connection->get_remoteProperties())); + connection->resourceDestroy(); + } + QPID_LOG_CAT(debug, model, "Delete connection. user:" << userid << " rhost:" << id); +} + +void ManagedConnection::setUserid(const std::string& uid) +{ + userid = uid; + if (agent && connection) { + connection->set_authIdentity(userid); + agent->raiseEvent(_qmf::EventClientConnect(id, userid, connection->get_remoteProperties())); + } + QPID_LOG_CAT(debug, model, "Create connection. user:" << userid << " rhost:" << id ); +} + +void ManagedConnection::setSaslMechanism(const std::string& mechanism) +{ + connection->set_saslMechanism(mechanism); +} + +void ManagedConnection::setSaslSsf(int ssf) +{ + connection->set_saslSsf(ssf); +} + +qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const +{ + return connection; +} + +std::string ManagedConnection::getId() const { return id; } +std::string ManagedConnection::getUserid() const { return userid; } + +bool ManagedConnection::isLocal(const ConnectionToken* t) const +{ + return this == t; +} +void ManagedConnection::outgoingMessageSent() +{ + if (connection) connection->inc_msgsToClient(); +} + +void ManagedConnection::incomingMessageReceived() +{ + if (connection) connection->inc_msgsFromClient(); +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/ManagedConnection.h b/cpp/src/qpid/broker/amqp/ManagedConnection.h new file mode 100644 index 0000000000..e2d0376918 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -0,0 +1,59 @@ +#ifndef QPID_BROKER_AMQP_MANAGEDCONNECTION_H +#define QPID_BROKER_AMQP_MANAGEDCONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/management/Manageable.h" +#include "qpid/broker/ConnectionToken.h" +#include "qmf/org/apache/qpid/broker/Connection.h" + +namespace qpid { +namespace management { +class ManagementAgent; +class ManagementObject; +} +namespace broker { +class Broker; +namespace amqp { + +class ManagedConnection : public qpid::management::Manageable, public ConnectionToken +{ + public: + ManagedConnection(Broker& broker, const std::string id); + virtual ~ManagedConnection(); + void setUserid(const std::string&); + std::string getId() const; + std::string getUserid() const; + void setSaslMechanism(const std::string&); + void setSaslSsf(int); + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + bool isLocal(const ConnectionToken* t) const; + void incomingMessageReceived(); + void outgoingMessageSent(); + private: + const std::string id; + std::string userid; + qmf::org::apache::qpid::broker::Connection::shared_ptr connection; + qpid::management::ManagementAgent* agent; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_MANAGEDCONNECTION_H*/ diff --git a/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp new file mode 100644 index 0000000000..f36a1e8da4 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ManagedOutgoingLink.h" +#include "qpid/broker/amqp/ManagedSession.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/types/Variant.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/log/Statement.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { + +ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSession& p, const std::string i, bool topic) + : parent(p), id(i) +{ + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent) { + subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id, + false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map())); + agent->addObject(subscription); + subscription->set_creditMode("n/a"); + } +} +ManagedOutgoingLink::~ManagedOutgoingLink() +{ + if (subscription != 0) subscription->resourceDestroy(); +} + +qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const +{ + return subscription; +} + +void ManagedOutgoingLink::outgoingMessageSent() +{ + if (subscription) { subscription->inc_delivered(); } + parent.outgoingMessageSent(); +} +void ManagedOutgoingLink::outgoingMessageAccepted() +{ + parent.outgoingMessageAccepted(); +} +void ManagedOutgoingLink::outgoingMessageRejected() +{ + parent.outgoingMessageRejected(); +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h new file mode 100644 index 0000000000..20a1095db2 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h @@ -0,0 +1,53 @@ +#ifndef QPID_BROKER_AMQP_MANAGEDOUTGOINGLINK_H +#define QPID_BROKER_AMQP_MANAGEDOUTGOINGLINK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Subscription.h" + +namespace qpid { +namespace management { +class ManagementObject; +} +namespace broker { +class Broker; +class Queue; +namespace amqp { +class ManagedSession; + +class ManagedOutgoingLink : public qpid::management::Manageable +{ + public: + ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic); + virtual ~ManagedOutgoingLink(); + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + void outgoingMessageSent(); + void outgoingMessageAccepted(); + void outgoingMessageRejected(); + private: + ManagedSession& parent; + const std::string id; + qmf::org::apache::qpid::broker::Subscription::shared_ptr subscription; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_MANAGEDOUTGOINGLINK_H*/ diff --git a/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/cpp/src/qpid/broker/amqp/ManagedSession.cpp new file mode 100644 index 0000000000..9bef0e842b --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ManagedSession.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/ManagedSession.h" +#include "qpid/broker/amqp/ManagedConnection.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/log/Statement.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { + +ManagedSession::ManagedSession(Broker& broker, ManagedConnection& p, const std::string i) : parent(p), id(i), unacked(0) +{ + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent != 0) { + session = _qmf::Session::shared_ptr(new _qmf::Session(agent, this, broker.GetVhostObject(), id)); + session->set_attached(true); + session->set_detachedLifespan(0); + session->clr_expireTime(); + session->set_connectionRef(parent.GetManagementObject()->getObjectId()); + agent->addObject(session); + } +} + +ManagedSession::~ManagedSession() +{ + if (session) session->resourceDestroy(); +} + +qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const +{ + return session; +} + +bool ManagedSession::isLocal(const ConnectionToken* t) const +{ + return &parent == t; +} + +void ManagedSession::outgoingMessageSent() +{ + if (session) session->set_unackedMessages(++unacked); + parent.outgoingMessageSent(); +} +void ManagedSession::outgoingMessageAccepted() +{ + if (session) session->set_unackedMessages(--unacked); +} +void ManagedSession::outgoingMessageRejected() +{ + if (session) session->set_unackedMessages(--unacked); +} + +void ManagedSession::incomingMessageReceived() +{ + parent.incomingMessageReceived(); +} +void ManagedSession::incomingMessageAccepted() +{ + +} +void ManagedSession::incomingMessageRejected() +{ + +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/ManagedSession.h b/cpp/src/qpid/broker/amqp/ManagedSession.h new file mode 100644 index 0000000000..1f56964bb6 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ManagedSession.h @@ -0,0 +1,59 @@ +#ifndef QPID_BROKER_AMQP_MANAGEDSESSION_H +#define QPID_BROKER_AMQP_MANAGEDSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Session.h" +#include "qpid/broker/ConnectionToken.h" +#include "qpid/broker/OwnershipToken.h" + +namespace qpid { +namespace management { +class ManagementObject; +} +namespace broker { +class Broker; +namespace amqp { +class ManagedConnection; + +class ManagedSession : public qpid::management::Manageable, public OwnershipToken +{ + public: + ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id); + virtual ~ManagedSession(); + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + bool isLocal(const ConnectionToken* t) const; + void incomingMessageReceived(); + void incomingMessageAccepted(); + void incomingMessageRejected(); + void outgoingMessageSent(); + void outgoingMessageAccepted(); + void outgoingMessageRejected(); + private: + ManagedConnection& parent; + const std::string id; + qmf::org::apache::qpid::broker::Session::shared_ptr session; + size_t unacked; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_MANAGEDSESSION_H*/ diff --git a/cpp/src/qpid/broker/amqp/Message.cpp b/cpp/src/qpid/broker/amqp/Message.cpp new file mode 100644 index 0000000000..a4c346e131 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Message.cpp @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Message.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Buffer.h" +#include <string.h> + +namespace qpid { +namespace broker { +namespace amqp { + +namespace { +std::string empty; +} + +std::string Message::getRoutingKey() const +{ + std::string v; + v.assign(subject.data, subject.size); + return v; +} +std::string Message::getUserId() const +{ + std::string v; + v.assign(userId.data, userId.size); + return v; +} + +bool Message::isPersistent() const +{ + return durable && durable.get(); +} +bool Message::getTtl(uint64_t& t) const +{ + if (!ttl) { + return false; + } else { + t = ttl.get(); + return true; + } +} + +uint8_t Message::getPriority() const +{ + if (!priority) return 4; + else return priority.get(); +} + +std::string Message::getPropertyAsString(const std::string& /*key*/) const { return empty; } +std::string Message::getAnnotationAsString(const std::string& /*key*/) const { return empty; } +void Message::processProperties(MapHandler&) const {} + +//getContentSize() is primarily used in stats about the number of +//bytes enqueued/dequeued etc, not sure whether this is the right name +//and whether it should indeed only be the content that is thus +//measured +uint64_t Message::getContentSize() const { return data.size(); } +//getContent() is used primarily for decoding qmf messages in management and ha +std::string Message::getContent() const { return empty; } + +Message::Message(size_t size) : data(size) +{ + deliveryAnnotations.init(); + messageAnnotations.init(); + bareMessage.init(); + + userId.init(); + to.init(); + subject.init(); + replyTo.init(); + contentType.init(); + contentEncoding.init(); + + applicationProperties.init(); + body.init(); + footer.init(); +} +char* Message::getData() { return &data[0]; } +const char* Message::getData() const { return &data[0]; } +size_t Message::getSize() const { return data.size(); } + +qpid::amqp::MessageId Message::getMessageId() const +{ + return messageId; +} +qpid::amqp::CharSequence Message::getReplyTo() const +{ + return replyTo; +} +qpid::amqp::MessageId Message::getCorrelationId() const +{ + return correlationId; +} +qpid::amqp::CharSequence Message::getContentType() const +{ + return contentType; +} +qpid::amqp::CharSequence Message::getContentEncoding() const +{ + return contentEncoding; +} + +qpid::amqp::CharSequence Message::getDeliveryAnnotations() const +{ + return deliveryAnnotations; +} +qpid::amqp::CharSequence Message::getMessageAnnotations() const +{ + return messageAnnotations; +} +qpid::amqp::CharSequence Message::getApplicationProperties() const +{ + return applicationProperties; +} +qpid::amqp::CharSequence Message::getBareMessage() const +{ + return bareMessage; +} +qpid::amqp::CharSequence Message::getBody() const +{ + return body; +} +qpid::amqp::CharSequence Message::getFooter() const +{ + return footer; +} + +void Message::scan() +{ + qpid::amqp::Decoder decoder(getData(), getSize()); + decoder.read(*this); + bareMessage = qpid::amqp::MessageReader::getBareMessage(); + if (bareMessage.data && !bareMessage.size) { + bareMessage.size = getSize() - (bareMessage.data - getData()); + } +} + +const Message& Message::get(const qpid::broker::Message& message) +{ + const Message* m = dynamic_cast<const Message*>(&message.getEncoding()); + if (!m) throw qpid::Exception("Translation not yet implemented!!"); + return *m; +} + +void Message::onDurable(bool b) { durable = b; } +void Message::onPriority(uint8_t i) { priority = i; } +void Message::onTtl(uint32_t i) { ttl = i; } +void Message::onFirstAcquirer(bool b) { firstAcquirer = b; } +void Message::onDeliveryCount(uint32_t i) { deliveryCount = i; } + +void Message::onMessageId(uint64_t v) { messageId.set(v); } +void Message::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { messageId.set(v, t); } +void Message::onUserId(const qpid::amqp::CharSequence& v) { userId = v; } +void Message::onTo(const qpid::amqp::CharSequence& v) { to = v; } +void Message::onSubject(const qpid::amqp::CharSequence& v) { subject = v; } +void Message::onReplyTo(const qpid::amqp::CharSequence& v) { replyTo = v; } +void Message::onCorrelationId(uint64_t v) { correlationId.set(v); } +void Message::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { correlationId.set(v, t);} +void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; } +void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; } +void Message::onAbsoluteExpiryTime(int64_t) {} +void Message::onCreationTime(int64_t) {} +void Message::onGroupId(const qpid::amqp::CharSequence&) {} +void Message::onGroupSequence(uint32_t) {} +void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {} + +void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; } +void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; } +void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; } +void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; } +void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} +void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; } + + +//PersistableMessage interface: +void Message::encode(framing::Buffer& buffer) const +{ + buffer.putLong(0);//4-byte format indicator + buffer.putRawData((const uint8_t*) getData(), getSize()); + QPID_LOG(debug, "Encoded 1.0 message of " << getSize() << " bytes, including " << bareMessage.size << " bytes of 'bare message'"); +} +uint32_t Message::encodedSize() const +{ + return 4/*format indicator*/ + data.size(); +} +//in 1.0 the binary header/content makes less sense and in any case +//the functionality that split originally supported (i.e. lazy-loaded +//messages) is no longer in use; for 1.0 we therefore treat the whole +//content as 'header' and load it in the first stage. +uint32_t Message::encodedHeaderSize() const +{ + return encodedSize(); +} +void Message::decodeHeader(framing::Buffer& buffer) +{ + if (buffer.available() != getSize()) { + QPID_LOG(warning, "1.0 Message buffer was " << data.size() << " bytes, but " << buffer.available() << " bytes are available. Resizing."); + data.resize(buffer.available()); + } + buffer.getRawData((uint8_t*) getData(), getSize()); + scan(); + QPID_LOG(debug, "Decoded 1.0 message of " << getSize() << " bytes, including " << bareMessage.size << " bytes of 'bare message'"); +} +void Message::decodeContent(framing::Buffer& /*buffer*/) {} + +boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& annotations) const +{ + //message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations + size_t extra = 0; + if (messageAnnotations) { + //TODO: actual merge required + } else { + //add whole new section + extra = qpid::amqp::MessageEncoder::getEncodedSize(annotations, true); + } + boost::intrusive_ptr<Message> copy(new Message(data.size()+extra)); + size_t position(0); + if (deliveryAnnotations) { + ::memcpy(©->data[position], deliveryAnnotations.data, deliveryAnnotations.size); + position += deliveryAnnotations.size; + } + if (messageAnnotations) { + //TODO: actual merge required + ::memcpy(©->data[position], messageAnnotations.data, messageAnnotations.size); + position += messageAnnotations.size; + } else { + qpid::amqp::MessageEncoder encoder(©->data[position], extra); + encoder.writeMap(annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true); + position += extra; + } + if (bareMessage) { + ::memcpy(©->data[position], bareMessage.data, bareMessage.size); + position += bareMessage.size; + } + if (footer) { + ::memcpy(©->data[position], footer.data, footer.size); + position += footer.size; + } + copy->scan(); + return copy; +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h new file mode 100644 index 0000000000..cc3406f72a --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Message.h @@ -0,0 +1,148 @@ +#ifndef QPID_BROKER_AMQP_MESSAGE_H +#define QPID_BROKER_AMQP_MESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Message.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/MessageId.h" +#include "qpid/amqp/MessageReader.h" +#include <boost/optional.hpp> + +namespace qpid { +namespace framing { +class Buffer; +} +namespace broker { +namespace amqp { + +/** + * Represents an AMQP 1.0 format message + */ +class Message : public qpid::broker::Message::Encoding, private qpid::amqp::MessageReader, public qpid::broker::PersistableMessage +{ + public: + //Encoding interface: + std::string getRoutingKey() const; + bool isPersistent() const; + uint8_t getPriority() const; + uint64_t getContentSize() const; + std::string getPropertyAsString(const std::string& key) const; + std::string getAnnotationAsString(const std::string& key) const; + bool getTtl(uint64_t&) const; + std::string getContent() const; + void processProperties(MapHandler&) const; + std::string getUserId() const; + + qpid::amqp::MessageId getMessageId() const; + qpid::amqp::CharSequence getReplyTo() const; + qpid::amqp::MessageId getCorrelationId() const; + qpid::amqp::CharSequence getContentType() const; + qpid::amqp::CharSequence getContentEncoding() const; + + qpid::amqp::CharSequence getDeliveryAnnotations() const; + qpid::amqp::CharSequence getMessageAnnotations() const; + qpid::amqp::CharSequence getApplicationProperties() const; + qpid::amqp::CharSequence getBareMessage() const; + qpid::amqp::CharSequence getBody() const; + qpid::amqp::CharSequence getFooter() const; + + Message(size_t size); + char* getData(); + const char* getData() const; + size_t getSize() const; + void scan(); + + //PersistableMessage interface: + void encode(framing::Buffer& buffer) const; + uint32_t encodedSize() const; + void decodeHeader(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer); + uint32_t encodedHeaderSize() const; + boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const; + + static const Message& get(const qpid::broker::Message&); + private: + std::vector<char> data; + + //header: + boost::optional<bool> durable; + boost::optional<uint8_t> priority; + boost::optional<uint32_t> ttl; + boost::optional<bool> firstAcquirer; + boost::optional<uint32_t> deliveryCount; + //annotations: + qpid::amqp::CharSequence deliveryAnnotations; + qpid::amqp::CharSequence messageAnnotations; + + qpid::amqp::CharSequence bareMessage;//properties, application-properties and content + //properties: + qpid::amqp::MessageId messageId; + qpid::amqp::CharSequence userId; + qpid::amqp::CharSequence to; + qpid::amqp::CharSequence subject; + qpid::amqp::CharSequence replyTo; + qpid::amqp::MessageId correlationId; + qpid::amqp::CharSequence contentType; + qpid::amqp::CharSequence contentEncoding; + + //application-properties: + qpid::amqp::CharSequence applicationProperties; + + //body: + qpid::amqp::CharSequence body; + + //footer: + qpid::amqp::CharSequence footer; + + //header: + void onDurable(bool b); + void onPriority(uint8_t i); + void onTtl(uint32_t i); + void onFirstAcquirer(bool b); + void onDeliveryCount(uint32_t i); + //properties: + void onMessageId(uint64_t); + void onMessageId(const qpid::amqp::CharSequence&, qpid::types::VariantType); + void onUserId(const qpid::amqp::CharSequence& v); + void onTo(const qpid::amqp::CharSequence& v); + void onSubject(const qpid::amqp::CharSequence& v); + void onReplyTo(const qpid::amqp::CharSequence& v); + void onCorrelationId(uint64_t); + void onCorrelationId(const qpid::amqp::CharSequence&, qpid::types::VariantType); + void onContentType(const qpid::amqp::CharSequence& v); + void onContentEncoding(const qpid::amqp::CharSequence& v); + void onAbsoluteExpiryTime(int64_t i); + void onCreationTime(int64_t); + void onGroupId(const qpid::amqp::CharSequence&); + void onGroupSequence(uint32_t); + void onReplyToGroupId(const qpid::amqp::CharSequence&); + + void onApplicationProperties(const qpid::amqp::CharSequence&); + void onDeliveryAnnotations(const qpid::amqp::CharSequence&); + void onMessageAnnotations(const qpid::amqp::CharSequence&); + void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&); + void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&); + void onFooter(const qpid::amqp::CharSequence&); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_MESSAGE_H*/ diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/cpp/src/qpid/broker/amqp/NodeProperties.cpp new file mode 100644 index 0000000000..eea7612cb9 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/NodeProperties.h" +#include "qpid/broker/amqp/DataReader.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/types/Variant.h" +#include "qpid/broker/QueueSettings.h" +#include "qpid/log/Statement.h" + +using qpid::amqp::CharSequence; +using qpid::amqp::Descriptor; + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +//distribution modes: +const std::string MOVE("move"); +const std::string COPY("copy"); +const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); + +//AMQP 0-10 standard parameters: +const std::string DURABLE("durable"); +const std::string AUTO_DELETE("auto-delete"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string EXCHANGE_TYPE("exchange-type"); +} + +NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exchangeType("topic") {} + +void NodeProperties::read(pn_data_t* data) +{ + DataReader reader(*this); + reader.read(data); +} + +void NodeProperties::process(const std::string& key, const qpid::types::Variant& value) +{ + QPID_LOG(notice, "Processing node property " << key << " = " << value); + if (key == SUPPORTED_DIST_MODES) { + if (value == MOVE) queue = true; + else if (value == COPY) queue = false; + } else if (key == DURABLE) { + durable = value; + } else if (key == AUTO_DELETE) { + autoDelete = value; + } else if (key == ALTERNATE_EXCHANGE) { + alternateExchange = value.asString(); + } else if (key == EXCHANGE_TYPE) { + exchangeType = value.asString(); + } else { + properties[key] = value; + } +} + +void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*) +{ + process(key.str(), qpid::types::Variant()); +} + +void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + process(key.str(), value.str()); +} + +void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + process(key.str(), value.str()); +} + +void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + process(key.str(), value.str()); +} + +QueueSettings NodeProperties::getQueueSettings() +{ + QueueSettings settings(durable, autoDelete); + qpid::types::Variant::Map unused; + settings.populate(properties, unused); + return settings; +} + +bool NodeProperties::isQueue() const +{ + return queue; +} +bool NodeProperties::isDurable() const +{ + return durable; +} +std::string NodeProperties::getExchangeType() const +{ + return exchangeType; +} +std::string NodeProperties::getAlternateExchange() const +{ + return alternateExchange; +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.h b/cpp/src/qpid/broker/amqp/NodeProperties.h new file mode 100644 index 0000000000..b81d1d712c --- /dev/null +++ b/cpp/src/qpid/broker/amqp/NodeProperties.h @@ -0,0 +1,71 @@ +#ifndef QPID_BROKER_AMQP_NODEPROPERTIES_H +#define QPID_BROKER_AMQP_NODEPROPERTIES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/types/Variant.h" + +struct pn_data_t; +namespace qpid { +namespace broker { +struct QueueSettings; +namespace amqp { + +class NodeProperties : public qpid::amqp::MapReader +{ + public: + NodeProperties(); + void read(pn_data_t*); + void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*); + void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*); + void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*); + void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*); + void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*); + void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*); + void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*); + void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*); + void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*); + void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*); + void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*); + void onUuidValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onTimestampValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*); + void onStringValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onSymbolValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + bool isQueue() const; + QueueSettings getQueueSettings(); + bool isDurable() const; + std::string getExchangeType() const; + std::string getAlternateExchange() const; + private: + bool queue; + bool durable; + bool autoDelete; + std::string exchangeType; + std::string alternateExchange; + qpid::types::Variant::Map properties; + + void process(const std::string&, const qpid::types::Variant&); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_NODEPROPERTIES_H*/ diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp new file mode 100644 index 0000000000..9605cacac1 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Outgoing.h" +#include "qpid/broker/amqp/Header.h" +#include "qpid/broker/amqp/Translation.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/TopicKeyNode.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { +namespace amqp { + +Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession& session, qpid::sys::OutputControl& o, bool topic) + : Consumer(pn_link_name(l), /*FIXME*/CONSUMER), + ManagedOutgoingLink(broker, *q, session, pn_link_name(l), topic), + exclusive(topic), + queue(q), deliveries(5000), link(l), out(o), + current(0), outstanding(0), + buffer(1024)/*used only for header at present*/ +{ + for (size_t i = 0 ; i < deliveries.capacity(); ++i) { + deliveries[i].init(i); + } +} + +void Outgoing::init() +{ + queue->consume(shared_from_this(), exclusive);//may throw exception +} + +bool Outgoing::dispatch() +{ + QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link)); + if (canDeliver()) { + if (queue->dispatch(shared_from_this())) { + return true; + } else { + pn_link_drained(link); + QPID_LOG(debug, "No message available on " << queue->getName()); + } + } else { + QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); + } + return false; +} + +void Outgoing::write(const char* data, size_t size) +{ + pn_link_send(link, data, size); +} + +void Outgoing::handle(pn_delivery_t* delivery) +{ + pn_delivery_tag_t tag = pn_delivery_tag(delivery); + size_t i = *reinterpret_cast<const size_t*>(tag.bytes); + Record& r = deliveries[i]; + if (pn_delivery_writable(delivery)) { + assert(r.msg); + assert(!r.delivery); + r.delivery = delivery; + //write header + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeHeader(Header(r.msg)); + write(&buffer[0], encoder.getPosition()); + Translation t(r.msg); + t.write(*this); + if (pn_link_advance(link)) { + --outstanding; + outgoingMessageSent(); + QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + } else { + QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + } + } + if (pn_delivery_updated(delivery)) { + assert(r.delivery == delivery); + r.disposition = pn_delivery_remote_state(delivery); + if (r.disposition) { + switch (r.disposition) { + case PN_ACCEPTED: + //TODO: only if consuming + queue->dequeue(0, r.cursor); + outgoingMessageAccepted(); + break; + case PN_REJECTED: + queue->reject(r.cursor); + outgoingMessageRejected(); + break; + case PN_RELEASED: + queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented + outgoingMessageRejected();//TODO: not quite true... + break; + case PN_MODIFIED: + queue->release(r.cursor, true);//TODO: proper handling of modified + outgoingMessageRejected();//TODO: not quite true... + break; + default: + QPID_LOG(warning, "Unhandled disposition: " << r.disposition); + } + //TODO: ony settle once any dequeue on store has completed + pn_delivery_settle(delivery); + r.reset(); + } + } +} + +bool Outgoing::canDeliver() +{ + return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding; +} + +void Outgoing::detached() +{ + QPID_LOG(debug, "Detaching outgoing link from " << queue->getName()); + queue->cancel(shared_from_this()); + //TODO: release in a clearer order? + for (size_t i = 0 ; i < deliveries.capacity(); ++i) { + if (deliveries[i].msg) queue->release(deliveries[i].cursor, true); + } + Queue::tryAutoDelete(*queue->getBroker(), queue, "", ""); +} + +//Consumer interface: +bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg) +{ + Record& r = deliveries[current++]; + if (current >= deliveries.capacity()) current = 0; + r.cursor = cursor; + r.msg = msg; + pn_delivery(link, r.tag); + QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + ++outstanding; + return true; +} + +void Outgoing::notify() +{ + QPID_LOG(trace, "Notification received for " << queue->getName()); + out.activateOutput(); +} + +bool Outgoing::accept(const qpid::broker::Message&) +{ + return true; +} + +void Outgoing::setSubjectFilter(const std::string& f) +{ + subjectFilter = f; +} + +namespace { + +bool match(TokenIterator& filter, TokenIterator& target) +{ + bool wild = false; + while (!filter.finished()) + { + if (filter.match1('*')) { + if (target.finished()) return false; + //else move to next word in filter target + filter.next(); + target.next(); + } else if (filter.match1('#')) { + // i.e. filter word is '#' which can match a variable number of words in the target + filter.next(); + if (filter.finished()) return true; + else if (target.finished()) return false; + wild = true; + } else { + //filter word needs to match target exactly + if (target.finished()) return false; + std::string word; + target.pop(word); + if (filter.match(word)) { + wild = false; + filter.next(); + } else if (!wild) { + return false; + } + } + } + return target.finished(); +} +bool match(const std::string& filter, const std::string& target) +{ + TokenIterator lhs(filter); + TokenIterator rhs(target); + return match(lhs, rhs); +} +} + +bool Outgoing::filter(const qpid::broker::Message& m) +{ + return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()); +} + +void Outgoing::cancel() {} + +void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {} + +qpid::broker::OwnershipToken* Outgoing::getSession() +{ + return 0; +} + +Outgoing::Record::Record() : delivery(0), disposition(0), index(0) {} +void Outgoing::Record::init(size_t i) +{ + index = i; + tag.bytes = reinterpret_cast<const char*>(&index); + tag.size = sizeof(index); +} +void Outgoing::Record::reset() +{ + cursor = QueueCursor(); + msg = qpid::broker::Message(); + delivery = 0; + disposition = 0; +} + + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h new file mode 100644 index 0000000000..a8450a48cf --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Outgoing.h @@ -0,0 +1,108 @@ +#ifndef QPID_BROKER_AMQP1_OUTGOING_H +#define QPID_BROKER_AMQP1_OUTGOING_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Message.h" +#include "qpid/broker/amqp/ManagedOutgoingLink.h" +#include "qpid/broker/Consumer.h" +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace sys { +class OutputControl; +} +namespace broker { +class Broker; +class Queue; +namespace amqp { +class ManagedSession; +template <class T> +class CircularArray +{ + public: + CircularArray(size_t l) : limit(l), data(new T[limit]) {} + T& operator[](size_t i) { return data[i]; } + size_t capacity() { return limit; } + ~CircularArray() { delete [] data; } + private: + const size_t limit; + T* const data; + size_t next; +}; + +/** + * + */ +class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from_this<Outgoing>, public ManagedOutgoingLink +{ + public: + Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic); + void setSubjectFilter(const std::string&); + void init(); + bool dispatch(); + void write(const char* data, size_t size); + void handle(pn_delivery_t* delivery); + bool canDeliver(); + void detached(); + + //Consumer interface: + bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); + void notify(); + bool accept(const qpid::broker::Message&); + bool filter(const qpid::broker::Message&); + void cancel(); + void acknowledged(const qpid::broker::DeliveryRecord&); + qpid::broker::OwnershipToken* getSession(); + + private: + + struct Record + { + QueueCursor cursor; + qpid::broker::Message msg; + pn_delivery_t* delivery; + int disposition; + size_t index; + pn_delivery_tag_t tag; + + Record(); + void init(size_t i); + void reset(); + }; + + const bool exclusive; + boost::shared_ptr<Queue> queue; + CircularArray<Record> deliveries; + pn_link_t* link; + qpid::sys::OutputControl& out; + size_t current; + int outstanding; + std::vector<char> buffer; + std::string subjectFilter; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP1_OUTGOING_H*/ diff --git a/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp new file mode 100644 index 0000000000..711592257c --- /dev/null +++ b/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/Plugin.h" +#include "qpid/SaslFactory.h" +#include "qpid/NullSaslServer.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Protocol.h" +#include "qpid/broker/RecoverableMessage.h" +#include "qpid/broker/RecoverableMessageImpl.h" +#include "qpid/broker/amqp/Connection.h" +#include "qpid/broker/amqp/Message.h" +#include "qpid/broker/amqp/Sasl.h" +#include "qpid/broker/amqp/Translation.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { +namespace amqp { + +class ProtocolImpl : public Protocol +{ + public: + ProtocolImpl(Broker& b) : broker(b) {} + qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&); + boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); + private: + Broker& broker; +}; + +struct ProtocolPlugin : public Plugin +{ + void earlyInitialize(Plugin::Target& target) + { + //need to register protocol before recovery from store + broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target); + if (broker) { + broker->getProtocolRegistry().add("AMQP 1.0", new ProtocolImpl(*broker)); + } + } + + void initialize(Plugin::Target&) {} +}; + +ProtocolPlugin instance; // Static initialization + +qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& external) +{ + if (v == qpid::framing::ProtocolVersion(1, 0)) { + if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) { + if (broker.getOptions().auth) { + QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)"); + return new qpid::broker::amqp::Sasl(out, id, broker, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external)); + } else { + std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm)); + QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)"); + return new qpid::broker::amqp::Sasl(out, id, broker, authenticator); + } + } else { + if (broker.getOptions().auth) { + throw qpid::Exception("SASL layer required!"); + } else { + QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)"); + return new qpid::broker::amqp::Connection(out, id, broker, false); + } + } + } + return 0; +} + +boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m) +{ + qpid::broker::amqp::Translation t(m); + return t.getTransfer(); +} + +boost::shared_ptr<RecoverableMessage> ProtocolImpl::recover(qpid::framing::Buffer& buffer) +{ + QPID_LOG(debug, "Recovering, checking for 1.0 message format indicator..."); + uint32_t format = buffer.getLong(); + if (format == 0) { + QPID_LOG(debug, "Recovered message IS in 1.0 format"); + //this is a 1.0 format message + boost::intrusive_ptr<qpid::broker::amqp::Message> m(new qpid::broker::amqp::Message(buffer.available())); + m->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(qpid::broker::Message(m, m))); + } else { + QPID_LOG(debug, "Recovered message is NOT in 1.0 format"); + return RecoverableMessage::shared_ptr(); + } +} + + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Sasl.cpp b/cpp/src/qpid/broker/amqp/Sasl.cpp new file mode 100644 index 0000000000..4b89e7b15d --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Sasl.cpp @@ -0,0 +1,167 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Sasl.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/sys/SecurityLayer.h" +#include <boost/format.hpp> +#include <vector> + +namespace qpid { +namespace broker { +namespace amqp { + +Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> auth) + : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, true), + authenticator(auth), + state(INCOMPLETE), writeHeader(true), haveOutput(true) +{ + out.activateOutput(); + mechanisms(authenticator->getMechanisms()); +} + +Sasl::~Sasl() {} + +size_t Sasl::decode(const char* buffer, size_t size) +{ + if (state == AUTHENTICATED) { + if (securityLayer.get()) return securityLayer->decode(buffer, size); + else return connection.decode(buffer, size); + } else if (state == INCOMPLETE && size) { + size_t decoded = read(buffer, size); + QPID_LOG(trace, id << " Sasl::decode(" << size << "): " << decoded); + return decoded; + } else { + return 0; + } +} + +size_t Sasl::encode(char* buffer, size_t size) +{ + if (state == AUTHENTICATED) { + if (securityLayer.get()) return securityLayer->encode(buffer, size); + else return connection.encode(buffer, size); + } else { + size_t encoded = 0; + if (writeHeader) { + encoded += writeProtocolHeader(buffer, size); + if (!encoded) return 0; + writeHeader = false; + } + if (encoded < size) { + encoded += write(buffer + encoded, size - encoded); + } + if (state == SUCCESS_PENDING) { + state = AUTHENTICATED; + } else if (state == FAILURE_PENDING) { + state = FAILED; + } else { + haveOutput = (encoded == size); + } + QPID_LOG(trace, id << " Sasl::encode(" << size << "): " << encoded); + return encoded; + } +} + +bool Sasl::canEncode() +{ + if (state == AUTHENTICATED) { + if (securityLayer.get()) return securityLayer->canEncode(); + else return connection.canEncode(); + } else { + return haveOutput; + } +} + +void Sasl::closed() +{ + if (state == AUTHENTICATED) { + connection.closed(); + } else { + QPID_LOG(info, id << " Connection closed prior to authentication completing"); + state = FAILED; + } +} +bool Sasl::isClosed() const +{ + if (state == AUTHENTICATED) { + return connection.isClosed(); + } else { + return state == FAILED; + } +} + +framing::ProtocolVersion Sasl::getVersion() const +{ + return connection.getVersion(); +} +namespace { +const std::string EMPTY; +} + +void Sasl::init(const std::string& mechanism, const std::string* response, const std::string* /*hostname*/) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-INIT(" << mechanism << ", " << (response ? *response : EMPTY) << ")"); + //TODO: what should we do with hostname here? + std::string c; + respond(authenticator->start(mechanism, response, c), c); + connection.setSaslMechanism(mechanism); +} + +void Sasl::response(const std::string* r) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-RESPONSE(" << (r ? *r : EMPTY) << ")"); + std::string c; + respond(authenticator->step(r, c), c); +} + +void Sasl::respond(qpid::SaslServer::Status status, const std::string& chllnge) +{ + switch (status) { + case qpid::SaslServer::OK: + connection.setUserid(authenticator->getUserid()); + completed(true); + //can't set authenticated & failed until we have actually sent the outcome + state = SUCCESS_PENDING; + securityLayer = authenticator->getSecurityLayer(65535); + if (securityLayer.get()) { + QPID_LOG_CAT(info, security, id << " Security layer installed"); + securityLayer->init(&connection); + connection.setSaslSsf(securityLayer->getSsf()); + } + QPID_LOG_CAT(info, security, id << " Authenticated as " << authenticator->getUserid()); + break; + case qpid::SaslServer::FAIL: + completed(false); + state = FAILURE_PENDING; + QPID_LOG_CAT(info, security, id << " Failed to authenticate"); + break; + case qpid::SaslServer::CHALLENGE: + challenge(&chllnge); + QPID_LOG_CAT(info, security, id << " Challenge issued"); + break; + } + haveOutput = true; + out.activateOutput(); +} +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Sasl.h b/cpp/src/qpid/broker/amqp/Sasl.h new file mode 100644 index 0000000000..079128be02 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Sasl.h @@ -0,0 +1,72 @@ +#ifndef QPID_BROKER_AMQP_SASL_H +#define QPID_BROKER_AMQP_SASL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Connection.h" +#include "qpid/SaslServer.h" +#include "qpid/amqp/SaslServer.h" +#include "qpid/sys/ConnectionCodec.h" +#include <memory> +namespace qpid { +namespace sys { +class SecurityLayer; +} +namespace broker { +namespace amqp { + +/** + * An AMQP 1.0 SASL Security Layer for authentication and optionally + * encryption. + */ +class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer +{ + public: + Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> authenticator); + ~Sasl(); + + size_t decode(const char* buffer, size_t size); + size_t encode(char* buffer, size_t size); + bool canEncode(); + + void closed(); + bool isClosed() const; + + framing::ProtocolVersion getVersion() const; + private: + qpid::sys::OutputControl& out; + Connection connection; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + std::auto_ptr<qpid::SaslServer> authenticator; + enum { + INCOMPLETE, SUCCESS_PENDING, FAILURE_PENDING, AUTHENTICATED, FAILED + } state; + + bool writeHeader; + bool haveOutput; + + void init(const std::string& mechanism, const std::string* response, const std::string* hostname); + void response(const std::string*); + void respond(qpid::SaslServer::Status status, const std::string& challenge); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_SASL_H*/ diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp new file mode 100644 index 0000000000..fabe609473 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -0,0 +1,332 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Session.h" +#include "Outgoing.h" +#include "Message.h" +#include "ManagedConnection.h" +#include "qpid/broker/AsyncCompletion.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/TopicExchange.h" +#include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/TopicExchange.h" +#include "qpid/broker/amqp/Filter.h" +#include "qpid/broker/amqp/NodeProperties.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include <boost/intrusive_ptr.hpp> +#include <boost/format.hpp> +#include <map> +#include <sstream> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace broker { +namespace amqp { + +class Target +{ + public: + Target(pn_link_t* l) : credit(100), window(0), link(l) {} + virtual ~Target() {} + bool flow(); + bool needFlow(); + virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message + protected: + const uint32_t credit; + uint32_t window; + pn_link_t* link; +}; + +class Queue : public Target +{ + public: + Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {} + void handle(qpid::broker::Message& m); + private: + boost::shared_ptr<qpid::broker::Queue> queue; +}; + +class Exchange : public Target +{ + public: + Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {} + void handle(qpid::broker::Message& m); + private: + boost::shared_ptr<qpid::broker::Exchange> exchange; +}; + +Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o) + : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {} + + +Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus) +{ + ResolvedNode node; + node.exchange = broker.getExchanges().find(name); + node.queue = broker.getQueues().find(name); + if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) { + //TODO: handle dynamic creation + //is it a queue or an exchange? + NodeProperties properties; + properties.read(pn_terminus_properties(terminus)); + if (properties.isQueue()) { + node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; + } else { + qpid::framing::FieldTable args; + node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(), + args, connection.getUserid(), connection.getId()).first; + } + } else if (node.queue && node.exchange) { + QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); + node.exchange.reset(); + } + return node; +} + +void Session::attach(pn_link_t* link) +{ + if (pn_link_is_sender(link)) { + pn_terminus_t* source = pn_link_remote_source(link); + //i.e a subscription + if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { + throw qpid::Exception("No source specified!");/*invalid-field?*/ + } + std::string name = pn_terminus_get_address(source); + QPID_LOG(debug, "Received attach request for outgoing link from " << name); + pn_terminus_set_address(pn_link_source(link), name.c_str()); + + ResolvedNode node = resolve(name, source); + Filter filter; + filter.read(pn_terminus_filter(source)); + + if (node.queue) { + boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false)); + q->init(); + if (filter.hasSubjectFilter()) { + q->setSubjectFilter(filter.getSubjectFilter()); + } + senders[link] = q; + } else if (node.exchange) { + QueueSettings settings(false, true); + //TODO: populate settings from source details when available from engine + boost::shared_ptr<qpid::broker::Queue> queue + = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; + if (filter.hasSubjectFilter()) { + filter.bind(node.exchange, queue); + filter.write(pn_terminus_filter(pn_link_source(link))); + } else if (node.exchange->getType() == FanOutExchange::typeName) { + node.exchange->bind(queue, std::string(), 0); + } else if (node.exchange->getType() == TopicExchange::typeName) { + node.exchange->bind(queue, "#", 0); + } else { + throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ + } + boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true)); + senders[link] = q; + q->init(); + } else { + pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); + throw qpid::Exception("Node not found: " + name);/*not-found*/ + } + QPID_LOG(debug, "Outgoing link attached"); + } else { + pn_terminus_t* target = pn_link_remote_target(link); + if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { + throw qpid::Exception("No target specified!");/*invalid field?*/ + } + std::string name = pn_terminus_get_address(target); + QPID_LOG(debug, "Received attach request for incoming link to " << name); + pn_terminus_set_address(pn_link_target(link), name.c_str()); + + ResolvedNode node = resolve(name, target); + + if (node.queue) { + boost::shared_ptr<Target> q(new Queue(node.queue, link)); + targets[link] = q; + } else if (node.exchange) { + boost::shared_ptr<Target> e(new Exchange(node.exchange, link)); + targets[link] = e; + } else { + pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); + throw qpid::Exception("Node not found: " + name);/*not-found*/ + } + QPID_LOG(debug, "Incoming link attached"); + } +} + +void Session::detach(pn_link_t* link) +{ + if (pn_link_is_sender(link)) { + Senders::iterator i = senders.find(link); + if (i != senders.end()) { + i->second->detached(); + senders.erase(i); + QPID_LOG(debug, "Outgoing link detached"); + } + } else { + targets.erase(link); + QPID_LOG(debug, "Incoming link detached"); + } +} +namespace { + class Transfer : public qpid::broker::AsyncCompletion::Callback + { + public: + Transfer(pn_delivery_t* d, boost::shared_ptr<Session> s) : delivery(d), session(s) {} + void completed(bool sync) { session->accepted(delivery, sync); } + boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone() + { + boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session)); + return copy; + } + private: + pn_delivery_t* delivery; + boost::shared_ptr<Session> session; + }; +} + +void Session::accepted(pn_delivery_t* delivery, bool sync) +{ + if (sync) { + //this is on IO thread + pn_delivery_update(delivery, PN_ACCEPTED); + pn_delivery_settle(delivery);//do we need to check settlement modes/orders? + incomingMessageAccepted(); + } else { + //this is not on IO thread, need to delay processing until on IO thread + qpid::sys::Mutex::ScopedLock l(lock); + if (!deleted) { + completed.push_back(delivery); + out.activateOutput(); + } + } +} + +void Session::incoming(pn_link_t* link, pn_delivery_t* delivery) +{ + pn_delivery_tag_t tag = pn_delivery_tag(delivery); + QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); + boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery))); + /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize()); + received->scan(); + pn_link_advance(link); + + qpid::broker::Message message(received, received); + + incomingMessageReceived(); + Targets::iterator target = targets.find(link); + if (target == targets.end()) { + QPID_LOG(error, "Received message on unknown link"); + pn_delivery_update(delivery, PN_REJECTED); + pn_delivery_settle(delivery);//do we need to check settlement modes/orders? + incomingMessageRejected(); + } else { + target->second->handle(message); + received->begin(); + Transfer t(delivery, shared_from_this()); + received->end(t); + if (target->second->needFlow()) out.activateOutput(); + } +} +void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery) +{ + Senders::iterator sender = senders.find(link); + if (sender == senders.end()) { + QPID_LOG(error, "Delivery returned for unknown link"); + } else { + sender->second->handle(delivery); + } +} + +bool Session::dispatch() +{ + bool output(false); + for (Senders::iterator s = senders.begin(); s != senders.end(); ++s) { + if (s->second->dispatch()) output = true; + } + if (completed.size()) { + output = true; + std::deque<pn_delivery_t*> copy; + { + qpid::sys::Mutex::ScopedLock l(lock); + completed.swap(copy); + } + for (std::deque<pn_delivery_t*>::iterator i = copy.begin(); i != copy.end(); ++i) { + accepted(*i, true); + } + } + for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) { + if (t->second->flow()) output = true; + } + + return output; +} + +void Session::close() +{ + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + i->second->detached(); + } + senders.clear(); + targets.clear();//at present no explicit cleanup required for targets + QPID_LOG(debug, "Session closed, all senders cancelled."); + qpid::sys::Mutex::ScopedLock l(lock); + deleted = true; +} + +void Queue::handle(qpid::broker::Message& message) +{ + queue->deliver(message); + --window; +} + +void Exchange::handle(qpid::broker::Message& message) +{ + DeliverableMessage deliverable(message, 0); + exchange->route(deliverable); + --window; +} + +bool Target::flow() +{ + bool issue = window < credit; + if (issue) { + pn_link_flow(link, credit - window);//TODO: proper flow control + window = credit; + } + return issue; +} + +bool Target::needFlow() +{ + return window <= (credit/2); +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Session.h b/cpp/src/qpid/broker/amqp/Session.h new file mode 100644 index 0000000000..7dbdaf05fc --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Session.h @@ -0,0 +1,87 @@ +#ifndef QPID_BROKER_AMQP1_SESSION_H +#define QPID_BROKER_AMQP1_SESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Mutex.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/broker/amqp/ManagedSession.h" +#include <deque> +#include <map> +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> + +struct pn_delivery_t; +struct pn_link_t; +struct pn_session_t; +struct pn_terminus_t; + +namespace qpid { +namespace broker { + +class Broker; +class Exchange; +class Queue; + +namespace amqp { + +class ManagedConnection; +class Outgoing; +class Target; +/** + * + */ +class Session : public ManagedSession, public boost::enable_shared_from_this<Session> +{ + public: + Session(pn_session_t*, qpid::broker::Broker&, ManagedConnection&, qpid::sys::OutputControl&); + void attach(pn_link_t*); + void detach(pn_link_t*); + void incoming(pn_link_t*, pn_delivery_t*); + void outgoing(pn_link_t*, pn_delivery_t*); + bool dispatch(); + void close(); + + //called when a transfer is completly processed (e.g.including stored on disk) + void accepted(pn_delivery_t*, bool sync); + private: + typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > Senders; + typedef std::map<pn_link_t*, boost::shared_ptr<Target> > Targets; + pn_session_t* session; + qpid::broker::Broker& broker; + ManagedConnection& connection; + qpid::sys::OutputControl& out; + Targets targets; + Senders senders; + std::deque<pn_delivery_t*> completed; + bool deleted; + qpid::sys::Mutex lock; + struct ResolvedNode + { + boost::shared_ptr<qpid::broker::Exchange> exchange; + boost::shared_ptr<qpid::broker::Queue> queue; + }; + + ResolvedNode resolve(const std::string name, pn_terminus_t* terminus); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP1_SESSION_H*/ diff --git a/cpp/src/qpid/broker/amqp/Translation.cpp b/cpp/src/qpid/broker/amqp/Translation.cpp new file mode 100644 index 0000000000..ca2094b965 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Translation.cpp @@ -0,0 +1,241 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Translation.h" +#include "qpid/broker/amqp/Outgoing.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/types/Variant.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { +namespace amqp { +namespace { + +const std::string EMPTY; +const std::string FORWARD_SLASH("/"); + +std::string translate(const qpid::framing::ReplyTo r) +{ + if (r.hasExchange()) { + if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey(); + else return r.getExchange(); + } else return r.getRoutingKey(); +} +std::string translate(const qpid::amqp::CharSequence& chars) +{ + if (chars.data && chars.size) return std::string(chars.data, chars.size); + else return EMPTY; +} +bool setMessageId(qpid::framing::MessageProperties& m, const qpid::amqp::CharSequence& chars) +{ + if (chars.data && chars.size) { + if (chars.size == 16) { + m.setMessageId(qpid::framing::Uuid(chars.data)); + return true; + } else { + std::istringstream in(translate(chars)); + qpid::framing::Uuid uuid; + in >> uuid; + if (!in.fail()) { + m.setMessageId(uuid); + return true; + } + } + } + return false; +} +class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties +{ + public: + bool hasMessageId() const { return messageProperties && messageProperties->hasMessageId(); } + std::string getMessageId() const { return messageProperties ? messageProperties->getMessageId().str() : EMPTY; } + bool hasUserId() const { return messageProperties && messageProperties->hasUserId(); } + std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; } + bool hasTo() const { return getDestination().size() || hasSubject(); } + std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); } + bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); } + std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; } + bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); } + std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; } + bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); } + std::string getCorrelationId() const { return messageProperties ? messageProperties->getCorrelationId() : EMPTY; } + bool hasContentType() const { return messageProperties && messageProperties->hasContentType(); } + std::string getContentType() const { return messageProperties ? messageProperties->getContentType() : EMPTY; } + bool hasContentEncoding() const { return messageProperties && messageProperties->hasContentEncoding(); } + std::string getContentEncoding() const { return messageProperties ? messageProperties->getContentEncoding() : EMPTY; } + bool hasAbsoluteExpiryTime() const { return deliveryProperties && deliveryProperties->hasExpiration(); } + int64_t getAbsoluteExpiryTime() const { return deliveryProperties ? deliveryProperties->getExpiration() : 0; } + bool hasCreationTime() const { return false; } + int64_t getCreationTime() const { return 0; } + bool hasGroupId() const {return false; } + std::string getGroupId() const { return EMPTY; } + bool hasGroupSequence() const { return false; } + uint32_t getGroupSequence() const { return 0; } + bool hasReplyToGroupId() const { return false; } + std::string getReplyToGroupId() const { return EMPTY; } + + const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); } + Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t), + messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()), + deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>()) + {} + private: + const qpid::broker::amqp_0_10::MessageTransfer& transfer; + const qpid::framing::MessageProperties* messageProperties; + const qpid::framing::DeliveryProperties* deliveryProperties; + + std::string getDestination() const + { + return transfer.getMethod<qpid::framing::MessageTransferBody>()->getDestination(); + } +}; +} + +Translation::Translation(const qpid::broker::Message& m) : original(m) {} + + +boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer() +{ + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t = + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding())); + if (t) { + return t;//no translation required + } else { + const Message* message = dynamic_cast<const Message*>(&original.getEncoding()); + if (message) { + //translate 1.0 message into 0-10 + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0))); + qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody())); + qpid::framing::AMQFrame content((qpid::framing::AMQContentBody())); + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + transfer->getFrames().append(method); + transfer->getFrames().append(header); + + qpid::amqp::CharSequence body = message->getBody(); + content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size); + transfer->getFrames().append(content); + + qpid::framing::MessageProperties* props = + transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true); + props->setContentLength(body.size); + + qpid::amqp::MessageId mid = message->getMessageId(); + qpid::framing::Uuid uuid; + switch (mid.type) { + case qpid::amqp::MessageId::UUID: + case qpid::amqp::MessageId::BYTES: + if (mid.value.bytes.size == 0) break; + if (setMessageId(*props, mid.value.bytes)) break; + case qpid::amqp::MessageId::ULONG: + QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID"); + break; + } + + qpid::amqp::MessageId cid = message->getCorrelationId(); + switch (cid.type) { + case qpid::amqp::MessageId::UUID: + assert(cid.value.bytes.size = 16); + props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str()); + break; + case qpid::amqp::MessageId::BYTES: + if (cid.value.bytes.size) { + props->setCorrelationId(translate(cid.value.bytes)); + } + break; + case qpid::amqp::MessageId::ULONG: + props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong)); + break; + } + // TODO: ReplyTo - there is no way to reliably determine + // the type of the node from just its name, unless we + // query the brokers registries + + if (message->getContentType()) props->setContentType(translate(message->getContentType())); + if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding())); + props->setUserId(message->getUserId()); + // TODO: FieldTable applicationHeaders; + qpid::amqp::CharSequence ap = message->getApplicationProperties(); + if (ap) { + qpid::amqp::Decoder d(ap.data, ap.size); + qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders()); + } + + qpid::framing::DeliveryProperties* dp = + transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true); + dp->setPriority(message->getPriority()); + if (message->isPersistent()) dp->setDeliveryMode(2); + if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey()); + + return transfer.get(); + } else { + throw qpid::Exception("Could not write message data in AMQP 0-10 format"); + } + } +} + +void Translation::write(Outgoing& out) +{ + const Message* message = dynamic_cast<const Message*>(&original.getEncoding()); + if (message) { + //write annotations + //TODO: merge in any newly added annotations + qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations(); + qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations(); + if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size); + if (messageAnnotations.size) out.write(messageAnnotations.data, messageAnnotations.size); + //write bare message + qpid::amqp::CharSequence bareMessage = message->getBareMessage(); + if (bareMessage.size) out.write(bareMessage.data, bareMessage.size); + //write footer: + qpid::amqp::CharSequence footer = message->getFooter(); + if (footer.size) out.write(footer.data, footer.size); + } else { + const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()); + if (transfer) { + Properties_0_10 properties(*transfer); + qpid::types::Variant::Map applicationProperties; + qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties); + std::string content = transfer->getContent(); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeBinary(content, &qpid::amqp::message::DATA); + out.write(&buffer[0], encoder.getPosition()); + } else { + QPID_LOG(error, "Could not write message data in AMQP 1.0 format"); + } + } +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Translation.h b/cpp/src/qpid/broker/amqp/Translation.h new file mode 100644 index 0000000000..64d96560e3 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Translation.h @@ -0,0 +1,58 @@ +#ifndef QPID_BROKER_AMQP_TRANSLATION_H +#define QPID_BROKER_AMQP_TRANSLATION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace broker { +class Message; +namespace amqp_0_10 { +class MessageTransfer; +} +namespace amqp { + +class Outgoing; +/** + * + */ +class Translation +{ + public: + Translation(const qpid::broker::Message& message); + + /** + * @returns a pointer to an AMQP 0-10 message transfer suitable + * for sending on an 0-10 session, translating from 1.0 as + * necessary + */ + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> getTransfer(); + /** + * Writes the AMQP 1.0 bare message and any annotations, translating from 0-10 if necessary + */ + void write(Outgoing&); + private: + const qpid::broker::Message& original; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_TRANSLATION_H*/ |