summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/amqp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/amqp')
-rw-r--r--cpp/src/qpid/broker/amqp/Connection.cpp247
-rw-r--r--cpp/src/qpid/broker/amqp/Connection.h73
-rw-r--r--cpp/src/qpid/broker/amqp/DataReader.cpp187
-rw-r--r--cpp/src/qpid/broker/amqp/DataReader.h53
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.cpp150
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.h63
-rw-r--r--cpp/src/qpid/broker/amqp/Header.cpp65
-rw-r--r--cpp/src/qpid/broker/amqp/Header.h50
-rw-r--r--cpp/src/qpid/broker/amqp/ManagedConnection.cpp98
-rw-r--r--cpp/src/qpid/broker/amqp/ManagedConnection.h59
-rw-r--r--cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp70
-rw-r--r--cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h53
-rw-r--r--cpp/src/qpid/broker/amqp/ManagedSession.cpp88
-rw-r--r--cpp/src/qpid/broker/amqp/ManagedSession.h59
-rw-r--r--cpp/src/qpid/broker/amqp/Message.cpp264
-rw-r--r--cpp/src/qpid/broker/amqp/Message.h148
-rw-r--r--cpp/src/qpid/broker/amqp/NodeProperties.cpp179
-rw-r--r--cpp/src/qpid/broker/amqp/NodeProperties.h71
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.cpp244
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.h108
-rw-r--r--cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp117
-rw-r--r--cpp/src/qpid/broker/amqp/Sasl.cpp167
-rw-r--r--cpp/src/qpid/broker/amqp/Sasl.h72
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp332
-rw-r--r--cpp/src/qpid/broker/amqp/Session.h87
-rw-r--r--cpp/src/qpid/broker/amqp/Translation.cpp241
-rw-r--r--cpp/src/qpid/broker/amqp/Translation.h58
27 files changed, 3403 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp
new file mode 100644
index 0000000000..1f135cf931
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+#include "Session.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/OutputControl.h"
+#include <sstream>
+extern "C" {
+#include <proton/engine.h>
+#include <proton/error.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, bool saslInUse)
+ : ManagedConnection(b, i),
+ connection(pn_connection()),
+ transport(pn_transport()),
+ out(o), id(i), broker(b), haveOutput(true)
+{
+ if (pn_transport_bind(transport, connection)) {
+ //error
+ }
+ out.activateOutput();
+ bool enableTrace(false);
+ QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+ if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
+
+ if (!saslInUse) {
+ //feed in a dummy AMQP 1.0 header as engine expects one, but
+ //we already read it (if sasl is in use we read the sasl
+ //header,not the AMQP 1.0 header).
+ std::vector<char> protocolHeader(8);
+ qpid::framing::ProtocolInitiation pi(getVersion());
+ qpid::framing::Buffer buffer(&protocolHeader[0], protocolHeader.size());
+ pi.encode(buffer);
+ pn_transport_input(transport, &protocolHeader[0], protocolHeader.size());
+
+ //wont get a userid, so set a dummy one on the ManagedConnection to trigger event
+ setUserid("no authentication used");
+ }
+}
+
+
+Connection::~Connection()
+{
+
+ pn_transport_free(transport);
+ pn_connection_free(connection);
+}
+
+pn_transport_t* Connection::getTransport()
+{
+ return transport;
+}
+size_t Connection::decode(const char* buffer, size_t size)
+{
+ QPID_LOG(trace, id << " decode(" << size << ")")
+ //TODO: Fix pn_engine_input() to take const buffer
+ ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
+ if (n > 0 || n == PN_EOS) {
+ //If engine returns EOS, have no way of knowing how many bytes
+ //it processed, but can assume none need to be reprocessed so
+ //consider them all read:
+ if (n == PN_EOS) n = size;
+ QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
+ process();
+ pn_transport_tick(transport, 0);
+ if (!haveOutput) {
+ haveOutput = true;
+ out.activateOutput();
+ }
+ return n;
+ } else if (n == PN_ERR) {
+ throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ } else {
+ return 0;
+ }
+}
+
+size_t Connection::encode(char* buffer, size_t size)
+{
+ QPID_LOG(trace, "encode(" << size << ")")
+ ssize_t n = pn_transport_output(transport, buffer, size);
+ if (n > 0) {
+ QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
+ haveOutput = true;
+ return n;
+ } else if (n == PN_EOS) {
+ haveOutput = size;
+ return size;//Is this right?
+ } else if (n == PN_ERR) {
+ throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ } else {
+ haveOutput = false;
+ return 0;
+ }
+}
+bool Connection::canEncode()
+{
+ for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
+ if (i->second->dispatch()) haveOutput = true;
+ }
+ process();
+ //TODO: proper handling of time in and out of tick
+ pn_transport_tick(transport, 0);
+ QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
+ return haveOutput;
+}
+void Connection::closed()
+{
+ //TODO: tear down sessions and associated links
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ i->second->close();
+ }
+}
+bool Connection::isClosed() const
+{
+ return pn_connection_state(connection) & PN_REMOTE_CLOSED;
+}
+framing::ProtocolVersion Connection::getVersion() const
+{
+ return qpid::framing::ProtocolVersion(1,0);
+}
+namespace {
+pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
+pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+}
+
+void Connection::process()
+{
+ QPID_LOG(trace, id << " process()");
+ if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
+ QPID_LOG_CAT(debug, model, id << " connection opened");
+ pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ pn_connection_open(connection);
+ }
+
+ for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
+ QPID_LOG_CAT(debug, model, id << " session begun");
+ pn_session_open(s);
+ boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
+ sessions[s] = ssn;
+ }
+ for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
+ pn_link_open(l);
+
+ Sessions::iterator session = sessions.find(pn_link_session(l));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " Link attached on unknown session!");
+ } else {
+ try {
+ session->second->attach(l);
+ QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
+ } catch (const std::exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ //TODO: set error details on detach when that is exposed via engine API
+ pn_link_close(l);
+ }
+ }
+ }
+
+ //handle deliveries
+ for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
+ pn_link_t* link = pn_delivery_link(delivery);
+ if (pn_link_is_receiver(link)) {
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ i->second->incoming(link, delivery);
+ } else {
+ pn_delivery_update(delivery, PN_REJECTED);
+ }
+ } else { //i.e. SENDER
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
+ i->second->outgoing(link, delivery);
+ } else {
+ QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
+ }
+ }
+ }
+
+
+ for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) {
+ pn_link_close(l);
+ Sessions::iterator session = sessions.find(pn_link_session(l));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
+ } else {
+ session->second->detach(l);
+ QPID_LOG_CAT(debug, model, id << " link detached");
+ }
+ }
+ for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) {
+ pn_session_close(s);
+ Sessions::iterator i = sessions.find(s);
+ if (i != sessions.end()) {
+ i->second->close();
+ sessions.erase(i);
+ QPID_LOG_CAT(debug, model, id << " session ended");
+ } else {
+ QPID_LOG(error, id << " peer attempted to close unrecognised session");
+ }
+ }
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ QPID_LOG_CAT(debug, model, id << " connection closed");
+ pn_connection_close(connection);
+ }
+}
+
+std::string Connection::getError()
+{
+ std::stringstream text;
+ pn_error_t* cerror = pn_connection_error(connection);
+ if (cerror) text << "connection error " << pn_error_text(cerror);
+ pn_error_t* terror = pn_transport_error(transport);
+ if (terror) text << "transport error " << pn_error_text(terror);
+ return text.str();
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Connection.h b/cpp/src/qpid/broker/amqp/Connection.h
new file mode 100644
index 0000000000..8af209af7a
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Connection.h
@@ -0,0 +1,73 @@
+#ifndef QPID_BROKER_AMQP1_CONNECTION_H
+#define QPID_BROKER_AMQP1_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/amqp/ManagedConnection.h"
+#include <map>
+#include <boost/shared_ptr.hpp>
+
+struct pn_connection_t;
+struct pn_session_t;
+struct pn_transport_t;
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+namespace amqp {
+
+class Session;
+/**
+ * AMQP 1.0 protocol support for broker
+ */
+class Connection : public sys::ConnectionCodec, public ManagedConnection
+{
+ public:
+ Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse);
+ ~Connection();
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(char* buffer, size_t size);
+ bool canEncode();
+
+ void closed();
+ bool isClosed() const;
+
+ framing::ProtocolVersion getVersion() const;
+ pn_transport_t* getTransport();
+ private:
+ typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
+ pn_connection_t* connection;
+ pn_transport_t* transport;
+ qpid::sys::OutputControl& out;
+ const std::string id;
+ qpid::broker::Broker& broker;
+ bool haveOutput;
+ Sessions sessions;
+
+ void process();
+ std::string getError();
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP1_CONNECTION_H*/
diff --git a/cpp/src/qpid/broker/amqp/DataReader.cpp b/cpp/src/qpid/broker/amqp/DataReader.cpp
new file mode 100644
index 0000000000..519dd71c9c
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/DataReader.cpp
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DataReader.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
+#include "qpid/log/Statement.h"
+#include <string>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+qpid::amqp::CharSequence convert(pn_bytes_t in)
+{
+ qpid::amqp::CharSequence out;
+ out.data = in.start;
+ out.size = in.size;
+ return out;
+}
+
+qpid::amqp::CharSequence convert(pn_uuid_t in)
+{
+ qpid::amqp::CharSequence out;
+ out.data = in.bytes;
+ out.size = 16;
+ return out;
+}
+}
+
+DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {}
+
+void DataReader::read(pn_data_t* data)
+{
+ /*
+ while (pn_data_next(data)) {
+ readOne(data);
+ }
+ */
+ do {
+ readOne(data);
+ } while (pn_data_next(data));
+}
+void DataReader::readOne(pn_data_t* data)
+{
+ qpid::amqp::Descriptor descriptor(0);
+ bool described = pn_data_is_described(data);
+ if (described) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ if (pn_data_type(data) == PN_ULONG) {
+ descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(data));
+ } else if (pn_data_type(data) == PN_SYMBOL) {
+ descriptor = qpid::amqp::Descriptor(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(notice, "Ignoring descriptor of type " << pn_data_type(data));
+ }
+ pn_data_next(data);
+ }
+ switch (pn_data_type(data)) {
+ case PN_NULL:
+ reader.onNull(described ? &descriptor : 0);
+ break;
+ case PN_BOOL:
+ reader.onBoolean(pn_data_get_bool(data), described ? &descriptor : 0);
+ break;
+ case PN_UBYTE:
+ reader.onUByte(pn_data_get_ubyte(data), described ? &descriptor : 0);
+ break;
+ case PN_BYTE:
+ reader.onByte(pn_data_get_byte(data), described ? &descriptor : 0);
+ break;
+ case PN_USHORT:
+ reader.onUShort(pn_data_get_ushort(data), described ? &descriptor : 0);
+ break;
+ case PN_SHORT:
+ reader.onShort(pn_data_get_short(data), described ? &descriptor : 0);
+ break;
+ case PN_UINT:
+ reader.onUInt(pn_data_get_uint(data), described ? &descriptor : 0);
+ break;
+ case PN_INT:
+ reader.onInt(pn_data_get_int(data), described ? &descriptor : 0);
+ break;
+ case PN_CHAR:
+ pn_data_get_char(data);
+ break;
+ case PN_ULONG:
+ reader.onULong(pn_data_get_ulong(data), described ? &descriptor : 0);
+ break;
+ case PN_LONG:
+ reader.onLong(pn_data_get_long(data), described ? &descriptor : 0);
+ break;
+ case PN_TIMESTAMP:
+ reader.onTimestamp(pn_data_get_timestamp(data), described ? &descriptor : 0);
+ break;
+ case PN_FLOAT:
+ reader.onFloat(pn_data_get_float(data), described ? &descriptor : 0);
+ break;
+ case PN_DOUBLE:
+ reader.onDouble(pn_data_get_double(data), described ? &descriptor : 0);
+ break;
+ case PN_DECIMAL32:
+ pn_data_get_decimal32(data);
+ break;
+ case PN_DECIMAL64:
+ pn_data_get_decimal64(data);
+ break;
+ case PN_DECIMAL128:
+ pn_data_get_decimal128(data);
+ break;
+ case PN_UUID:
+ reader.onUuid(convert(pn_data_get_uuid(data)), described ? &descriptor : 0);
+ break;
+ case PN_BINARY:
+ reader.onBinary(convert(pn_data_get_binary(data)), described ? &descriptor : 0);
+ break;
+ case PN_STRING:
+ reader.onString(convert(pn_data_get_string(data)), described ? &descriptor : 0);
+ break;
+ case PN_SYMBOL:
+ reader.onSymbol(convert(pn_data_get_symbol(data)), described ? &descriptor : 0);
+ break;
+ case PN_DESCRIBED:
+ break;
+ case PN_ARRAY:
+ readArray(data, described ? &descriptor : 0);
+ break;
+ case PN_LIST:
+ readList(data, described ? &descriptor : 0);
+ break;
+ case PN_MAP:
+ readMap(data, described ? &descriptor : 0);
+ break;
+ }
+ if (described) pn_data_exit(data);
+}
+
+void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*descriptor*/)
+{
+ //not yet implemented
+}
+
+void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
+{
+ size_t count = pn_data_get_list(data);
+ reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ read(data);
+ }
+ pn_data_exit(data);
+ reader.onEndList(count, descriptor);
+}
+
+void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
+{
+ size_t count = pn_data_get_map(data);
+ reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ read(data);
+ }
+ pn_data_exit(data);
+ reader.onEndMap(count, descriptor);
+}
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/DataReader.h b/cpp/src/qpid/broker/amqp/DataReader.h
new file mode 100644
index 0000000000..024507e7f2
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/DataReader.h
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_AMQP_DATAREADER_H
+#define QPID_BROKER_AMQP_DATAREADER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/Reader.h"
+
+struct pn_data_t;
+
+namespace qpid {
+namespace amqp {
+struct Descriptor;
+}
+namespace broker {
+namespace amqp {
+
+/**
+ * Allows use of Reader interface to read pn_data_t* data.
+ */
+class DataReader
+{
+ public:
+ DataReader(qpid::amqp::Reader& reader);
+ void read(pn_data_t*);
+ private:
+ qpid::amqp::Reader& reader;
+
+ void readOne(pn_data_t*);
+ void readMap(pn_data_t*, const qpid::amqp::Descriptor*);
+ void readList(pn_data_t*, const qpid::amqp::Descriptor*);
+ void readArray(pn_data_t*, const qpid::amqp::Descriptor*);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_DATAREADER_H*/
diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp
new file mode 100644
index 0000000000..38baba0df1
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+void Filter::read(pn_data_t* data)
+{
+ try {
+ DataReader reader(*this);
+ reader.read(data);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Error parsing filter: " << e.what());
+ }
+}
+
+void Filter::write(pn_data_t* data)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ subjectFilter.write(data);
+ pn_data_exit(data);
+}
+
+void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor)
+{
+ StringFilter filter;
+ filter.key = std::string(key.data, key.size);
+ filter.value = std::string(value.data, value.size);
+ if (descriptor) {
+ filter.described = true;
+ filter.descriptor = *descriptor;
+ if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
+ || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+ setSubjectFilter(filter);
+ } else {
+ QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor);
+ }
+ } else {
+ setSubjectFilter(filter);
+ }
+}
+
+bool Filter::hasSubjectFilter() const
+{
+ return !subjectFilter.value.empty();
+}
+
+std::string Filter::getSubjectFilter() const
+{
+ return subjectFilter.value;
+}
+
+
+void Filter::setSubjectFilter(const StringFilter& filter)
+{
+ if (hasSubjectFilter()) {
+ QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set");
+ } else {
+ subjectFilter = filter;
+ }
+}
+
+void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+{
+ subjectFilter.bind(exchange, queue);
+}
+
+Filter::StringFilter::StringFilter() : described(false), descriptor(0) {}
+namespace {
+pn_bytes_t convert(const std::string& in)
+{
+ pn_bytes_t out;
+ out.start = const_cast<char*>(in.data());
+ out.size = in.size();
+ return out;
+}
+pn_bytes_t convert(const qpid::amqp::CharSequence& in)
+{
+ pn_bytes_t out;
+ out.start = const_cast<char*>(in.data);
+ out.size = in.size;
+ return out;
+}
+}
+void Filter::StringFilter::write(pn_data_t* data)
+{
+ pn_data_put_symbol(data, convert(key));
+ if (described) {
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ switch (descriptor.type) {
+ case qpid::amqp::Descriptor::NUMERIC:
+ pn_data_put_ulong(data, descriptor.value.code);
+ break;
+ case qpid::amqp::Descriptor::SYMBOLIC:
+ pn_data_put_symbol(data, convert(descriptor.value.symbol));
+ break;
+ }
+ }
+ pn_data_put_string(data, convert(value));
+ if (described) pn_data_exit(data);
+}
+
+void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+{
+ if (described && exchange->getType() == DirectExchange::typeName
+ && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+ QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName());
+ if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
+ descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
+ } else {
+ qpid::amqp::CharSequence symbol;
+ symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data();
+ symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size();
+ descriptor = qpid::amqp::Descriptor(symbol);
+ }
+ }
+ exchange->bind(queue, value, 0);
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h
new file mode 100644
index 0000000000..20cceb372a
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Filter.h
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_AMQP_FILTER_H
+#define QPID_BROKER_AMQP_FILTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MapReader.h"
+#include "qpid/amqp/Descriptor.h"
+#include <boost/shared_ptr.hpp>
+
+struct pn_data_t;
+namespace qpid {
+namespace broker {
+class Exchange;
+class Queue;
+namespace amqp {
+
+
+class Filter : qpid::amqp::MapReader
+{
+ public:
+ void read(pn_data_t*);
+ void write(pn_data_t*);
+ bool hasSubjectFilter() const;
+ std::string getSubjectFilter() const;
+ void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+ private:
+ struct StringFilter
+ {
+ bool described;
+ qpid::amqp::Descriptor descriptor;
+ std::string key;
+ std::string value;
+ StringFilter();
+ void write(pn_data_t*);
+ void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+ };
+
+ void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor);
+ void setSubjectFilter(const StringFilter&);
+
+ StringFilter subjectFilter;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_FILTER_H*/
diff --git a/cpp/src/qpid/broker/amqp/Header.cpp b/cpp/src/qpid/broker/amqp/Header.cpp
new file mode 100644
index 0000000000..493e757a56
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Header.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Header.h"
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+bool Header::isDurable() const
+{
+ return message.isPersistent();
+}
+
+uint8_t Header::getPriority() const
+{
+ return message.getPriority();
+}
+
+bool Header::hasTtl() const
+{
+ uint64_t dummy(0);
+ return message.getTtl(dummy);
+}
+
+uint32_t Header::getTtl() const
+{
+ uint64_t ttl(0);
+ message.getTtl(ttl);
+ if (ttl > std::numeric_limits<uint32_t>::max()) return std::numeric_limits<uint32_t>::max();
+ else return (uint32_t) ttl;
+}
+
+bool Header::isFirstAcquirer() const
+{
+ return false;//TODO
+}
+
+uint32_t Header::getDeliveryCount() const
+{
+ return message.getDeliveryCount();
+}
+
+Header::Header(const qpid::broker::Message& m) : message(m) {}
+
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Header.h b/cpp/src/qpid/broker/amqp/Header.h
new file mode 100644
index 0000000000..6e4f763028
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Header.h
@@ -0,0 +1,50 @@
+#ifndef QPID_BROKER_AMQP_HEADER_H
+#define QPID_BROKER_AMQP_HEADER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MessageEncoder.h"
+
+namespace qpid {
+namespace broker {
+class Message;
+namespace amqp {
+
+/**
+ * Adapts the broker current message abstraction to provide that
+ * required by the AMQP 1.0 message encoder.
+ */
+class Header : public qpid::amqp::MessageEncoder::Header
+{
+ public:
+ Header(const qpid::broker::Message&);
+ bool isDurable() const;
+ uint8_t getPriority() const;
+ bool hasTtl() const;
+ uint32_t getTtl() const;
+ bool isFirstAcquirer() const;
+ uint32_t getDeliveryCount() const;
+ private:
+ const qpid::broker::Message& message;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_HEADER_H*/
diff --git a/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
new file mode 100644
index 0000000000..0253ba5552
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+#include "qmf/org/apache/qpid/broker/EventClientConnect.h"
+#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+ManagedConnection::ManagedConnection(Broker& broker, const std::string i) : id(i), agent(0)
+{
+ //management integration:
+ agent = broker.getManagementAgent();
+ if (agent != 0) {
+ qpid::management::Manageable* parent = broker.GetVhostObject();
+ // TODO set last bool true if system connection
+ connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, id, true, false, "AMQP 1.0"));
+ connection->set_shadow(false);
+ agent->addObject(connection);
+ }
+}
+
+ManagedConnection::~ManagedConnection()
+{
+ if (agent && connection) {
+ agent->raiseEvent(_qmf::EventClientDisconnect(id, userid, connection->get_remoteProperties()));
+ connection->resourceDestroy();
+ }
+ QPID_LOG_CAT(debug, model, "Delete connection. user:" << userid << " rhost:" << id);
+}
+
+void ManagedConnection::setUserid(const std::string& uid)
+{
+ userid = uid;
+ if (agent && connection) {
+ connection->set_authIdentity(userid);
+ agent->raiseEvent(_qmf::EventClientConnect(id, userid, connection->get_remoteProperties()));
+ }
+ QPID_LOG_CAT(debug, model, "Create connection. user:" << userid << " rhost:" << id );
+}
+
+void ManagedConnection::setSaslMechanism(const std::string& mechanism)
+{
+ connection->set_saslMechanism(mechanism);
+}
+
+void ManagedConnection::setSaslSsf(int ssf)
+{
+ connection->set_saslSsf(ssf);
+}
+
+qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
+{
+ return connection;
+}
+
+std::string ManagedConnection::getId() const { return id; }
+std::string ManagedConnection::getUserid() const { return userid; }
+
+bool ManagedConnection::isLocal(const ConnectionToken* t) const
+{
+ return this == t;
+}
+void ManagedConnection::outgoingMessageSent()
+{
+ if (connection) connection->inc_msgsToClient();
+}
+
+void ManagedConnection::incomingMessageReceived()
+{
+ if (connection) connection->inc_msgsFromClient();
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/ManagedConnection.h b/cpp/src/qpid/broker/amqp/ManagedConnection.h
new file mode 100644
index 0000000000..e2d0376918
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -0,0 +1,59 @@
+#ifndef QPID_BROKER_AMQP_MANAGEDCONNECTION_H
+#define QPID_BROKER_AMQP_MANAGEDCONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/management/Manageable.h"
+#include "qpid/broker/ConnectionToken.h"
+#include "qmf/org/apache/qpid/broker/Connection.h"
+
+namespace qpid {
+namespace management {
+class ManagementAgent;
+class ManagementObject;
+}
+namespace broker {
+class Broker;
+namespace amqp {
+
+class ManagedConnection : public qpid::management::Manageable, public ConnectionToken
+{
+ public:
+ ManagedConnection(Broker& broker, const std::string id);
+ virtual ~ManagedConnection();
+ void setUserid(const std::string&);
+ std::string getId() const;
+ std::string getUserid() const;
+ void setSaslMechanism(const std::string&);
+ void setSaslSsf(int);
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ bool isLocal(const ConnectionToken* t) const;
+ void incomingMessageReceived();
+ void outgoingMessageSent();
+ private:
+ const std::string id;
+ std::string userid;
+ qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
+ qpid::management::ManagementAgent* agent;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_MANAGEDCONNECTION_H*/
diff --git a/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
new file mode 100644
index 0000000000..f36a1e8da4
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ManagedOutgoingLink.h"
+#include "qpid/broker/amqp/ManagedSession.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/types/Variant.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSession& p, const std::string i, bool topic)
+ : parent(p), id(i)
+{
+ qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+ if (agent) {
+ subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id,
+ false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map()));
+ agent->addObject(subscription);
+ subscription->set_creditMode("n/a");
+ }
+}
+ManagedOutgoingLink::~ManagedOutgoingLink()
+{
+ if (subscription != 0) subscription->resourceDestroy();
+}
+
+qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const
+{
+ return subscription;
+}
+
+void ManagedOutgoingLink::outgoingMessageSent()
+{
+ if (subscription) { subscription->inc_delivered(); }
+ parent.outgoingMessageSent();
+}
+void ManagedOutgoingLink::outgoingMessageAccepted()
+{
+ parent.outgoingMessageAccepted();
+}
+void ManagedOutgoingLink::outgoingMessageRejected()
+{
+ parent.outgoingMessageRejected();
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
new file mode 100644
index 0000000000..20a1095db2
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_AMQP_MANAGEDOUTGOINGLINK_H
+#define QPID_BROKER_AMQP_MANAGEDOUTGOINGLINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Subscription.h"
+
+namespace qpid {
+namespace management {
+class ManagementObject;
+}
+namespace broker {
+class Broker;
+class Queue;
+namespace amqp {
+class ManagedSession;
+
+class ManagedOutgoingLink : public qpid::management::Manageable
+{
+ public:
+ ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic);
+ virtual ~ManagedOutgoingLink();
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ void outgoingMessageSent();
+ void outgoingMessageAccepted();
+ void outgoingMessageRejected();
+ private:
+ ManagedSession& parent;
+ const std::string id;
+ qmf::org::apache::qpid::broker::Subscription::shared_ptr subscription;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_MANAGEDOUTGOINGLINK_H*/
diff --git a/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/cpp/src/qpid/broker/amqp/ManagedSession.cpp
new file mode 100644
index 0000000000..9bef0e842b
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ManagedSession.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/ManagedSession.h"
+#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+ManagedSession::ManagedSession(Broker& broker, ManagedConnection& p, const std::string i) : parent(p), id(i), unacked(0)
+{
+ qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+ if (agent != 0) {
+ session = _qmf::Session::shared_ptr(new _qmf::Session(agent, this, broker.GetVhostObject(), id));
+ session->set_attached(true);
+ session->set_detachedLifespan(0);
+ session->clr_expireTime();
+ session->set_connectionRef(parent.GetManagementObject()->getObjectId());
+ agent->addObject(session);
+ }
+}
+
+ManagedSession::~ManagedSession()
+{
+ if (session) session->resourceDestroy();
+}
+
+qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const
+{
+ return session;
+}
+
+bool ManagedSession::isLocal(const ConnectionToken* t) const
+{
+ return &parent == t;
+}
+
+void ManagedSession::outgoingMessageSent()
+{
+ if (session) session->set_unackedMessages(++unacked);
+ parent.outgoingMessageSent();
+}
+void ManagedSession::outgoingMessageAccepted()
+{
+ if (session) session->set_unackedMessages(--unacked);
+}
+void ManagedSession::outgoingMessageRejected()
+{
+ if (session) session->set_unackedMessages(--unacked);
+}
+
+void ManagedSession::incomingMessageReceived()
+{
+ parent.incomingMessageReceived();
+}
+void ManagedSession::incomingMessageAccepted()
+{
+
+}
+void ManagedSession::incomingMessageRejected()
+{
+
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/ManagedSession.h b/cpp/src/qpid/broker/amqp/ManagedSession.h
new file mode 100644
index 0000000000..1f56964bb6
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ManagedSession.h
@@ -0,0 +1,59 @@
+#ifndef QPID_BROKER_AMQP_MANAGEDSESSION_H
+#define QPID_BROKER_AMQP_MANAGEDSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Session.h"
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/OwnershipToken.h"
+
+namespace qpid {
+namespace management {
+class ManagementObject;
+}
+namespace broker {
+class Broker;
+namespace amqp {
+class ManagedConnection;
+
+class ManagedSession : public qpid::management::Manageable, public OwnershipToken
+{
+ public:
+ ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id);
+ virtual ~ManagedSession();
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ bool isLocal(const ConnectionToken* t) const;
+ void incomingMessageReceived();
+ void incomingMessageAccepted();
+ void incomingMessageRejected();
+ void outgoingMessageSent();
+ void outgoingMessageAccepted();
+ void outgoingMessageRejected();
+ private:
+ ManagedConnection& parent;
+ const std::string id;
+ qmf::org::apache::qpid::broker::Session::shared_ptr session;
+ size_t unacked;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_MANAGEDSESSION_H*/
diff --git a/cpp/src/qpid/broker/amqp/Message.cpp b/cpp/src/qpid/broker/amqp/Message.cpp
new file mode 100644
index 0000000000..a4c346e131
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Message.cpp
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Message.h"
+#include "qpid/amqp/Decoder.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Buffer.h"
+#include <string.h>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+namespace {
+std::string empty;
+}
+
+std::string Message::getRoutingKey() const
+{
+ std::string v;
+ v.assign(subject.data, subject.size);
+ return v;
+}
+std::string Message::getUserId() const
+{
+ std::string v;
+ v.assign(userId.data, userId.size);
+ return v;
+}
+
+bool Message::isPersistent() const
+{
+ return durable && durable.get();
+}
+bool Message::getTtl(uint64_t& t) const
+{
+ if (!ttl) {
+ return false;
+ } else {
+ t = ttl.get();
+ return true;
+ }
+}
+
+uint8_t Message::getPriority() const
+{
+ if (!priority) return 4;
+ else return priority.get();
+}
+
+std::string Message::getPropertyAsString(const std::string& /*key*/) const { return empty; }
+std::string Message::getAnnotationAsString(const std::string& /*key*/) const { return empty; }
+void Message::processProperties(MapHandler&) const {}
+
+//getContentSize() is primarily used in stats about the number of
+//bytes enqueued/dequeued etc, not sure whether this is the right name
+//and whether it should indeed only be the content that is thus
+//measured
+uint64_t Message::getContentSize() const { return data.size(); }
+//getContent() is used primarily for decoding qmf messages in management and ha
+std::string Message::getContent() const { return empty; }
+
+Message::Message(size_t size) : data(size)
+{
+ deliveryAnnotations.init();
+ messageAnnotations.init();
+ bareMessage.init();
+
+ userId.init();
+ to.init();
+ subject.init();
+ replyTo.init();
+ contentType.init();
+ contentEncoding.init();
+
+ applicationProperties.init();
+ body.init();
+ footer.init();
+}
+char* Message::getData() { return &data[0]; }
+const char* Message::getData() const { return &data[0]; }
+size_t Message::getSize() const { return data.size(); }
+
+qpid::amqp::MessageId Message::getMessageId() const
+{
+ return messageId;
+}
+qpid::amqp::CharSequence Message::getReplyTo() const
+{
+ return replyTo;
+}
+qpid::amqp::MessageId Message::getCorrelationId() const
+{
+ return correlationId;
+}
+qpid::amqp::CharSequence Message::getContentType() const
+{
+ return contentType;
+}
+qpid::amqp::CharSequence Message::getContentEncoding() const
+{
+ return contentEncoding;
+}
+
+qpid::amqp::CharSequence Message::getDeliveryAnnotations() const
+{
+ return deliveryAnnotations;
+}
+qpid::amqp::CharSequence Message::getMessageAnnotations() const
+{
+ return messageAnnotations;
+}
+qpid::amqp::CharSequence Message::getApplicationProperties() const
+{
+ return applicationProperties;
+}
+qpid::amqp::CharSequence Message::getBareMessage() const
+{
+ return bareMessage;
+}
+qpid::amqp::CharSequence Message::getBody() const
+{
+ return body;
+}
+qpid::amqp::CharSequence Message::getFooter() const
+{
+ return footer;
+}
+
+void Message::scan()
+{
+ qpid::amqp::Decoder decoder(getData(), getSize());
+ decoder.read(*this);
+ bareMessage = qpid::amqp::MessageReader::getBareMessage();
+ if (bareMessage.data && !bareMessage.size) {
+ bareMessage.size = getSize() - (bareMessage.data - getData());
+ }
+}
+
+const Message& Message::get(const qpid::broker::Message& message)
+{
+ const Message* m = dynamic_cast<const Message*>(&message.getEncoding());
+ if (!m) throw qpid::Exception("Translation not yet implemented!!");
+ return *m;
+}
+
+void Message::onDurable(bool b) { durable = b; }
+void Message::onPriority(uint8_t i) { priority = i; }
+void Message::onTtl(uint32_t i) { ttl = i; }
+void Message::onFirstAcquirer(bool b) { firstAcquirer = b; }
+void Message::onDeliveryCount(uint32_t i) { deliveryCount = i; }
+
+void Message::onMessageId(uint64_t v) { messageId.set(v); }
+void Message::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { messageId.set(v, t); }
+void Message::onUserId(const qpid::amqp::CharSequence& v) { userId = v; }
+void Message::onTo(const qpid::amqp::CharSequence& v) { to = v; }
+void Message::onSubject(const qpid::amqp::CharSequence& v) { subject = v; }
+void Message::onReplyTo(const qpid::amqp::CharSequence& v) { replyTo = v; }
+void Message::onCorrelationId(uint64_t v) { correlationId.set(v); }
+void Message::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { correlationId.set(v, t);}
+void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; }
+void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; }
+void Message::onAbsoluteExpiryTime(int64_t) {}
+void Message::onCreationTime(int64_t) {}
+void Message::onGroupId(const qpid::amqp::CharSequence&) {}
+void Message::onGroupSequence(uint32_t) {}
+void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}
+
+void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; }
+void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; }
+void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; }
+void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; }
+void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
+void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; }
+
+
+//PersistableMessage interface:
+void Message::encode(framing::Buffer& buffer) const
+{
+ buffer.putLong(0);//4-byte format indicator
+ buffer.putRawData((const uint8_t*) getData(), getSize());
+ QPID_LOG(debug, "Encoded 1.0 message of " << getSize() << " bytes, including " << bareMessage.size << " bytes of 'bare message'");
+}
+uint32_t Message::encodedSize() const
+{
+ return 4/*format indicator*/ + data.size();
+}
+//in 1.0 the binary header/content makes less sense and in any case
+//the functionality that split originally supported (i.e. lazy-loaded
+//messages) is no longer in use; for 1.0 we therefore treat the whole
+//content as 'header' and load it in the first stage.
+uint32_t Message::encodedHeaderSize() const
+{
+ return encodedSize();
+}
+void Message::decodeHeader(framing::Buffer& buffer)
+{
+ if (buffer.available() != getSize()) {
+ QPID_LOG(warning, "1.0 Message buffer was " << data.size() << " bytes, but " << buffer.available() << " bytes are available. Resizing.");
+ data.resize(buffer.available());
+ }
+ buffer.getRawData((uint8_t*) getData(), getSize());
+ scan();
+ QPID_LOG(debug, "Decoded 1.0 message of " << getSize() << " bytes, including " << bareMessage.size << " bytes of 'bare message'");
+}
+void Message::decodeContent(framing::Buffer& /*buffer*/) {}
+
+boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
+{
+ //message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations
+ size_t extra = 0;
+ if (messageAnnotations) {
+ //TODO: actual merge required
+ } else {
+ //add whole new section
+ extra = qpid::amqp::MessageEncoder::getEncodedSize(annotations, true);
+ }
+ boost::intrusive_ptr<Message> copy(new Message(data.size()+extra));
+ size_t position(0);
+ if (deliveryAnnotations) {
+ ::memcpy(&copy->data[position], deliveryAnnotations.data, deliveryAnnotations.size);
+ position += deliveryAnnotations.size;
+ }
+ if (messageAnnotations) {
+ //TODO: actual merge required
+ ::memcpy(&copy->data[position], messageAnnotations.data, messageAnnotations.size);
+ position += messageAnnotations.size;
+ } else {
+ qpid::amqp::MessageEncoder encoder(&copy->data[position], extra);
+ encoder.writeMap(annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true);
+ position += extra;
+ }
+ if (bareMessage) {
+ ::memcpy(&copy->data[position], bareMessage.data, bareMessage.size);
+ position += bareMessage.size;
+ }
+ if (footer) {
+ ::memcpy(&copy->data[position], footer.data, footer.size);
+ position += footer.size;
+ }
+ copy->scan();
+ return copy;
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h
new file mode 100644
index 0000000000..cc3406f72a
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Message.h
@@ -0,0 +1,148 @@
+#ifndef QPID_BROKER_AMQP_MESSAGE_H
+#define QPID_BROKER_AMQP_MESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Message.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/MessageId.h"
+#include "qpid/amqp/MessageReader.h"
+#include <boost/optional.hpp>
+
+namespace qpid {
+namespace framing {
+class Buffer;
+}
+namespace broker {
+namespace amqp {
+
+/**
+ * Represents an AMQP 1.0 format message
+ */
+class Message : public qpid::broker::Message::Encoding, private qpid::amqp::MessageReader, public qpid::broker::PersistableMessage
+{
+ public:
+ //Encoding interface:
+ std::string getRoutingKey() const;
+ bool isPersistent() const;
+ uint8_t getPriority() const;
+ uint64_t getContentSize() const;
+ std::string getPropertyAsString(const std::string& key) const;
+ std::string getAnnotationAsString(const std::string& key) const;
+ bool getTtl(uint64_t&) const;
+ std::string getContent() const;
+ void processProperties(MapHandler&) const;
+ std::string getUserId() const;
+
+ qpid::amqp::MessageId getMessageId() const;
+ qpid::amqp::CharSequence getReplyTo() const;
+ qpid::amqp::MessageId getCorrelationId() const;
+ qpid::amqp::CharSequence getContentType() const;
+ qpid::amqp::CharSequence getContentEncoding() const;
+
+ qpid::amqp::CharSequence getDeliveryAnnotations() const;
+ qpid::amqp::CharSequence getMessageAnnotations() const;
+ qpid::amqp::CharSequence getApplicationProperties() const;
+ qpid::amqp::CharSequence getBareMessage() const;
+ qpid::amqp::CharSequence getBody() const;
+ qpid::amqp::CharSequence getFooter() const;
+
+ Message(size_t size);
+ char* getData();
+ const char* getData() const;
+ size_t getSize() const;
+ void scan();
+
+ //PersistableMessage interface:
+ void encode(framing::Buffer& buffer) const;
+ uint32_t encodedSize() const;
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer);
+ uint32_t encodedHeaderSize() const;
+ boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const;
+
+ static const Message& get(const qpid::broker::Message&);
+ private:
+ std::vector<char> data;
+
+ //header:
+ boost::optional<bool> durable;
+ boost::optional<uint8_t> priority;
+ boost::optional<uint32_t> ttl;
+ boost::optional<bool> firstAcquirer;
+ boost::optional<uint32_t> deliveryCount;
+ //annotations:
+ qpid::amqp::CharSequence deliveryAnnotations;
+ qpid::amqp::CharSequence messageAnnotations;
+
+ qpid::amqp::CharSequence bareMessage;//properties, application-properties and content
+ //properties:
+ qpid::amqp::MessageId messageId;
+ qpid::amqp::CharSequence userId;
+ qpid::amqp::CharSequence to;
+ qpid::amqp::CharSequence subject;
+ qpid::amqp::CharSequence replyTo;
+ qpid::amqp::MessageId correlationId;
+ qpid::amqp::CharSequence contentType;
+ qpid::amqp::CharSequence contentEncoding;
+
+ //application-properties:
+ qpid::amqp::CharSequence applicationProperties;
+
+ //body:
+ qpid::amqp::CharSequence body;
+
+ //footer:
+ qpid::amqp::CharSequence footer;
+
+ //header:
+ void onDurable(bool b);
+ void onPriority(uint8_t i);
+ void onTtl(uint32_t i);
+ void onFirstAcquirer(bool b);
+ void onDeliveryCount(uint32_t i);
+ //properties:
+ void onMessageId(uint64_t);
+ void onMessageId(const qpid::amqp::CharSequence&, qpid::types::VariantType);
+ void onUserId(const qpid::amqp::CharSequence& v);
+ void onTo(const qpid::amqp::CharSequence& v);
+ void onSubject(const qpid::amqp::CharSequence& v);
+ void onReplyTo(const qpid::amqp::CharSequence& v);
+ void onCorrelationId(uint64_t);
+ void onCorrelationId(const qpid::amqp::CharSequence&, qpid::types::VariantType);
+ void onContentType(const qpid::amqp::CharSequence& v);
+ void onContentEncoding(const qpid::amqp::CharSequence& v);
+ void onAbsoluteExpiryTime(int64_t i);
+ void onCreationTime(int64_t);
+ void onGroupId(const qpid::amqp::CharSequence&);
+ void onGroupSequence(uint32_t);
+ void onReplyToGroupId(const qpid::amqp::CharSequence&);
+
+ void onApplicationProperties(const qpid::amqp::CharSequence&);
+ void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
+ void onMessageAnnotations(const qpid::amqp::CharSequence&);
+ void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
+ void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
+ void onFooter(const qpid::amqp::CharSequence&);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_MESSAGE_H*/
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/cpp/src/qpid/broker/amqp/NodeProperties.cpp
new file mode 100644
index 0000000000..eea7612cb9
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/NodeProperties.h"
+#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/types/Variant.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/log/Statement.h"
+
+using qpid::amqp::CharSequence;
+using qpid::amqp::Descriptor;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+//distribution modes:
+const std::string MOVE("move");
+const std::string COPY("copy");
+const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+
+//AMQP 0-10 standard parameters:
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+}
+
+NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exchangeType("topic") {}
+
+void NodeProperties::read(pn_data_t* data)
+{
+ DataReader reader(*this);
+ reader.read(data);
+}
+
+void NodeProperties::process(const std::string& key, const qpid::types::Variant& value)
+{
+ QPID_LOG(notice, "Processing node property " << key << " = " << value);
+ if (key == SUPPORTED_DIST_MODES) {
+ if (value == MOVE) queue = true;
+ else if (value == COPY) queue = false;
+ } else if (key == DURABLE) {
+ durable = value;
+ } else if (key == AUTO_DELETE) {
+ autoDelete = value;
+ } else if (key == ALTERNATE_EXCHANGE) {
+ alternateExchange = value.asString();
+ } else if (key == EXCHANGE_TYPE) {
+ exchangeType = value.asString();
+ } else {
+ properties[key] = value;
+ }
+}
+
+void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*)
+{
+ process(key.str(), qpid::types::Variant());
+}
+
+void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ process(key.str(), value.str());
+}
+
+void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ process(key.str(), value.str());
+}
+
+void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ process(key.str(), value.str());
+}
+
+QueueSettings NodeProperties::getQueueSettings()
+{
+ QueueSettings settings(durable, autoDelete);
+ qpid::types::Variant::Map unused;
+ settings.populate(properties, unused);
+ return settings;
+}
+
+bool NodeProperties::isQueue() const
+{
+ return queue;
+}
+bool NodeProperties::isDurable() const
+{
+ return durable;
+}
+std::string NodeProperties::getExchangeType() const
+{
+ return exchangeType;
+}
+std::string NodeProperties::getAlternateExchange() const
+{
+ return alternateExchange;
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.h b/cpp/src/qpid/broker/amqp/NodeProperties.h
new file mode 100644
index 0000000000..b81d1d712c
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/NodeProperties.h
@@ -0,0 +1,71 @@
+#ifndef QPID_BROKER_AMQP_NODEPROPERTIES_H
+#define QPID_BROKER_AMQP_NODEPROPERTIES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MapReader.h"
+#include "qpid/types/Variant.h"
+
+struct pn_data_t;
+namespace qpid {
+namespace broker {
+struct QueueSettings;
+namespace amqp {
+
+class NodeProperties : public qpid::amqp::MapReader
+{
+ public:
+ NodeProperties();
+ void read(pn_data_t*);
+ void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
+ void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
+ void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*);
+ void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*);
+ void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*);
+ void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*);
+ void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*);
+ void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*);
+ void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
+ void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*);
+ void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*);
+ void onUuidValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onTimestampValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
+ void onStringValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onSymbolValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ bool isQueue() const;
+ QueueSettings getQueueSettings();
+ bool isDurable() const;
+ std::string getExchangeType() const;
+ std::string getAlternateExchange() const;
+ private:
+ bool queue;
+ bool durable;
+ bool autoDelete;
+ std::string exchangeType;
+ std::string alternateExchange;
+ qpid::types::Variant::Map properties;
+
+ void process(const std::string&, const qpid::types::Variant&);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_NODEPROPERTIES_H*/
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp
new file mode 100644
index 0000000000..9605cacac1
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Outgoing.h"
+#include "qpid/broker/amqp/Header.h"
+#include "qpid/broker/amqp/Translation.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession& session, qpid::sys::OutputControl& o, bool topic)
+ : Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
+ ManagedOutgoingLink(broker, *q, session, pn_link_name(l), topic),
+ exclusive(topic),
+ queue(q), deliveries(5000), link(l), out(o),
+ current(0), outstanding(0),
+ buffer(1024)/*used only for header at present*/
+{
+ for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
+ deliveries[i].init(i);
+ }
+}
+
+void Outgoing::init()
+{
+ queue->consume(shared_from_this(), exclusive);//may throw exception
+}
+
+bool Outgoing::dispatch()
+{
+ QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link));
+ if (canDeliver()) {
+ if (queue->dispatch(shared_from_this())) {
+ return true;
+ } else {
+ pn_link_drained(link);
+ QPID_LOG(debug, "No message available on " << queue->getName());
+ }
+ } else {
+ QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
+ }
+ return false;
+}
+
+void Outgoing::write(const char* data, size_t size)
+{
+ pn_link_send(link, data, size);
+}
+
+void Outgoing::handle(pn_delivery_t* delivery)
+{
+ pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+ size_t i = *reinterpret_cast<const size_t*>(tag.bytes);
+ Record& r = deliveries[i];
+ if (pn_delivery_writable(delivery)) {
+ assert(r.msg);
+ assert(!r.delivery);
+ r.delivery = delivery;
+ //write header
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeHeader(Header(r.msg));
+ write(&buffer[0], encoder.getPosition());
+ Translation t(r.msg);
+ t.write(*this);
+ if (pn_link_advance(link)) {
+ --outstanding;
+ outgoingMessageSent();
+ QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ } else {
+ QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ }
+ }
+ if (pn_delivery_updated(delivery)) {
+ assert(r.delivery == delivery);
+ r.disposition = pn_delivery_remote_state(delivery);
+ if (r.disposition) {
+ switch (r.disposition) {
+ case PN_ACCEPTED:
+ //TODO: only if consuming
+ queue->dequeue(0, r.cursor);
+ outgoingMessageAccepted();
+ break;
+ case PN_REJECTED:
+ queue->reject(r.cursor);
+ outgoingMessageRejected();
+ break;
+ case PN_RELEASED:
+ queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+ outgoingMessageRejected();//TODO: not quite true...
+ break;
+ case PN_MODIFIED:
+ queue->release(r.cursor, true);//TODO: proper handling of modified
+ outgoingMessageRejected();//TODO: not quite true...
+ break;
+ default:
+ QPID_LOG(warning, "Unhandled disposition: " << r.disposition);
+ }
+ //TODO: ony settle once any dequeue on store has completed
+ pn_delivery_settle(delivery);
+ r.reset();
+ }
+ }
+}
+
+bool Outgoing::canDeliver()
+{
+ return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding;
+}
+
+void Outgoing::detached()
+{
+ QPID_LOG(debug, "Detaching outgoing link from " << queue->getName());
+ queue->cancel(shared_from_this());
+ //TODO: release in a clearer order?
+ for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
+ if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
+ }
+ Queue::tryAutoDelete(*queue->getBroker(), queue, "", "");
+}
+
+//Consumer interface:
+bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
+{
+ Record& r = deliveries[current++];
+ if (current >= deliveries.capacity()) current = 0;
+ r.cursor = cursor;
+ r.msg = msg;
+ pn_delivery(link, r.tag);
+ QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ ++outstanding;
+ return true;
+}
+
+void Outgoing::notify()
+{
+ QPID_LOG(trace, "Notification received for " << queue->getName());
+ out.activateOutput();
+}
+
+bool Outgoing::accept(const qpid::broker::Message&)
+{
+ return true;
+}
+
+void Outgoing::setSubjectFilter(const std::string& f)
+{
+ subjectFilter = f;
+}
+
+namespace {
+
+bool match(TokenIterator& filter, TokenIterator& target)
+{
+ bool wild = false;
+ while (!filter.finished())
+ {
+ if (filter.match1('*')) {
+ if (target.finished()) return false;
+ //else move to next word in filter target
+ filter.next();
+ target.next();
+ } else if (filter.match1('#')) {
+ // i.e. filter word is '#' which can match a variable number of words in the target
+ filter.next();
+ if (filter.finished()) return true;
+ else if (target.finished()) return false;
+ wild = true;
+ } else {
+ //filter word needs to match target exactly
+ if (target.finished()) return false;
+ std::string word;
+ target.pop(word);
+ if (filter.match(word)) {
+ wild = false;
+ filter.next();
+ } else if (!wild) {
+ return false;
+ }
+ }
+ }
+ return target.finished();
+}
+bool match(const std::string& filter, const std::string& target)
+{
+ TokenIterator lhs(filter);
+ TokenIterator rhs(target);
+ return match(lhs, rhs);
+}
+}
+
+bool Outgoing::filter(const qpid::broker::Message& m)
+{
+ return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey());
+}
+
+void Outgoing::cancel() {}
+
+void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}
+
+qpid::broker::OwnershipToken* Outgoing::getSession()
+{
+ return 0;
+}
+
+Outgoing::Record::Record() : delivery(0), disposition(0), index(0) {}
+void Outgoing::Record::init(size_t i)
+{
+ index = i;
+ tag.bytes = reinterpret_cast<const char*>(&index);
+ tag.size = sizeof(index);
+}
+void Outgoing::Record::reset()
+{
+ cursor = QueueCursor();
+ msg = qpid::broker::Message();
+ delivery = 0;
+ disposition = 0;
+}
+
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h
new file mode 100644
index 0000000000..a8450a48cf
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -0,0 +1,108 @@
+#ifndef QPID_BROKER_AMQP1_OUTGOING_H
+#define QPID_BROKER_AMQP1_OUTGOING_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Message.h"
+#include "qpid/broker/amqp/ManagedOutgoingLink.h"
+#include "qpid/broker/Consumer.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace sys {
+class OutputControl;
+}
+namespace broker {
+class Broker;
+class Queue;
+namespace amqp {
+class ManagedSession;
+template <class T>
+class CircularArray
+{
+ public:
+ CircularArray(size_t l) : limit(l), data(new T[limit]) {}
+ T& operator[](size_t i) { return data[i]; }
+ size_t capacity() { return limit; }
+ ~CircularArray() { delete [] data; }
+ private:
+ const size_t limit;
+ T* const data;
+ size_t next;
+};
+
+/**
+ *
+ */
+class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from_this<Outgoing>, public ManagedOutgoingLink
+{
+ public:
+ Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic);
+ void setSubjectFilter(const std::string&);
+ void init();
+ bool dispatch();
+ void write(const char* data, size_t size);
+ void handle(pn_delivery_t* delivery);
+ bool canDeliver();
+ void detached();
+
+ //Consumer interface:
+ bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
+ void notify();
+ bool accept(const qpid::broker::Message&);
+ bool filter(const qpid::broker::Message&);
+ void cancel();
+ void acknowledged(const qpid::broker::DeliveryRecord&);
+ qpid::broker::OwnershipToken* getSession();
+
+ private:
+
+ struct Record
+ {
+ QueueCursor cursor;
+ qpid::broker::Message msg;
+ pn_delivery_t* delivery;
+ int disposition;
+ size_t index;
+ pn_delivery_tag_t tag;
+
+ Record();
+ void init(size_t i);
+ void reset();
+ };
+
+ const bool exclusive;
+ boost::shared_ptr<Queue> queue;
+ CircularArray<Record> deliveries;
+ pn_link_t* link;
+ qpid::sys::OutputControl& out;
+ size_t current;
+ int outstanding;
+ std::vector<char> buffer;
+ std::string subjectFilter;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP1_OUTGOING_H*/
diff --git a/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
new file mode 100644
index 0000000000..711592257c
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/Plugin.h"
+#include "qpid/SaslFactory.h"
+#include "qpid/NullSaslServer.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/broker/RecoverableMessage.h"
+#include "qpid/broker/RecoverableMessageImpl.h"
+#include "qpid/broker/amqp/Connection.h"
+#include "qpid/broker/amqp/Message.h"
+#include "qpid/broker/amqp/Sasl.h"
+#include "qpid/broker/amqp/Translation.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+class ProtocolImpl : public Protocol
+{
+ public:
+ ProtocolImpl(Broker& b) : broker(b) {}
+ qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&);
+ boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+ private:
+ Broker& broker;
+};
+
+struct ProtocolPlugin : public Plugin
+{
+ void earlyInitialize(Plugin::Target& target)
+ {
+ //need to register protocol before recovery from store
+ broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
+ if (broker) {
+ broker->getProtocolRegistry().add("AMQP 1.0", new ProtocolImpl(*broker));
+ }
+ }
+
+ void initialize(Plugin::Target&) {}
+};
+
+ProtocolPlugin instance; // Static initialization
+
+qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& external)
+{
+ if (v == qpid::framing::ProtocolVersion(1, 0)) {
+ if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) {
+ if (broker.getOptions().auth) {
+ QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)");
+ return new qpid::broker::amqp::Sasl(out, id, broker, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external));
+ } else {
+ std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm));
+ QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)");
+ return new qpid::broker::amqp::Sasl(out, id, broker, authenticator);
+ }
+ } else {
+ if (broker.getOptions().auth) {
+ throw qpid::Exception("SASL layer required!");
+ } else {
+ QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
+ return new qpid::broker::amqp::Connection(out, id, broker, false);
+ }
+ }
+ }
+ return 0;
+}
+
+boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m)
+{
+ qpid::broker::amqp::Translation t(m);
+ return t.getTransfer();
+}
+
+boost::shared_ptr<RecoverableMessage> ProtocolImpl::recover(qpid::framing::Buffer& buffer)
+{
+ QPID_LOG(debug, "Recovering, checking for 1.0 message format indicator...");
+ uint32_t format = buffer.getLong();
+ if (format == 0) {
+ QPID_LOG(debug, "Recovered message IS in 1.0 format");
+ //this is a 1.0 format message
+ boost::intrusive_ptr<qpid::broker::amqp::Message> m(new qpid::broker::amqp::Message(buffer.available()));
+ m->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(qpid::broker::Message(m, m)));
+ } else {
+ QPID_LOG(debug, "Recovered message is NOT in 1.0 format");
+ return RecoverableMessage::shared_ptr();
+ }
+}
+
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Sasl.cpp b/cpp/src/qpid/broker/amqp/Sasl.cpp
new file mode 100644
index 0000000000..4b89e7b15d
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Sasl.cpp
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Sasl.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SecurityLayer.h"
+#include <boost/format.hpp>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> auth)
+ : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, true),
+ authenticator(auth),
+ state(INCOMPLETE), writeHeader(true), haveOutput(true)
+{
+ out.activateOutput();
+ mechanisms(authenticator->getMechanisms());
+}
+
+Sasl::~Sasl() {}
+
+size_t Sasl::decode(const char* buffer, size_t size)
+{
+ if (state == AUTHENTICATED) {
+ if (securityLayer.get()) return securityLayer->decode(buffer, size);
+ else return connection.decode(buffer, size);
+ } else if (state == INCOMPLETE && size) {
+ size_t decoded = read(buffer, size);
+ QPID_LOG(trace, id << " Sasl::decode(" << size << "): " << decoded);
+ return decoded;
+ } else {
+ return 0;
+ }
+}
+
+size_t Sasl::encode(char* buffer, size_t size)
+{
+ if (state == AUTHENTICATED) {
+ if (securityLayer.get()) return securityLayer->encode(buffer, size);
+ else return connection.encode(buffer, size);
+ } else {
+ size_t encoded = 0;
+ if (writeHeader) {
+ encoded += writeProtocolHeader(buffer, size);
+ if (!encoded) return 0;
+ writeHeader = false;
+ }
+ if (encoded < size) {
+ encoded += write(buffer + encoded, size - encoded);
+ }
+ if (state == SUCCESS_PENDING) {
+ state = AUTHENTICATED;
+ } else if (state == FAILURE_PENDING) {
+ state = FAILED;
+ } else {
+ haveOutput = (encoded == size);
+ }
+ QPID_LOG(trace, id << " Sasl::encode(" << size << "): " << encoded);
+ return encoded;
+ }
+}
+
+bool Sasl::canEncode()
+{
+ if (state == AUTHENTICATED) {
+ if (securityLayer.get()) return securityLayer->canEncode();
+ else return connection.canEncode();
+ } else {
+ return haveOutput;
+ }
+}
+
+void Sasl::closed()
+{
+ if (state == AUTHENTICATED) {
+ connection.closed();
+ } else {
+ QPID_LOG(info, id << " Connection closed prior to authentication completing");
+ state = FAILED;
+ }
+}
+bool Sasl::isClosed() const
+{
+ if (state == AUTHENTICATED) {
+ return connection.isClosed();
+ } else {
+ return state == FAILED;
+ }
+}
+
+framing::ProtocolVersion Sasl::getVersion() const
+{
+ return connection.getVersion();
+}
+namespace {
+const std::string EMPTY;
+}
+
+void Sasl::init(const std::string& mechanism, const std::string* response, const std::string* /*hostname*/)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-INIT(" << mechanism << ", " << (response ? *response : EMPTY) << ")");
+ //TODO: what should we do with hostname here?
+ std::string c;
+ respond(authenticator->start(mechanism, response, c), c);
+ connection.setSaslMechanism(mechanism);
+}
+
+void Sasl::response(const std::string* r)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-RESPONSE(" << (r ? *r : EMPTY) << ")");
+ std::string c;
+ respond(authenticator->step(r, c), c);
+}
+
+void Sasl::respond(qpid::SaslServer::Status status, const std::string& chllnge)
+{
+ switch (status) {
+ case qpid::SaslServer::OK:
+ connection.setUserid(authenticator->getUserid());
+ completed(true);
+ //can't set authenticated & failed until we have actually sent the outcome
+ state = SUCCESS_PENDING;
+ securityLayer = authenticator->getSecurityLayer(65535);
+ if (securityLayer.get()) {
+ QPID_LOG_CAT(info, security, id << " Security layer installed");
+ securityLayer->init(&connection);
+ connection.setSaslSsf(securityLayer->getSsf());
+ }
+ QPID_LOG_CAT(info, security, id << " Authenticated as " << authenticator->getUserid());
+ break;
+ case qpid::SaslServer::FAIL:
+ completed(false);
+ state = FAILURE_PENDING;
+ QPID_LOG_CAT(info, security, id << " Failed to authenticate");
+ break;
+ case qpid::SaslServer::CHALLENGE:
+ challenge(&chllnge);
+ QPID_LOG_CAT(info, security, id << " Challenge issued");
+ break;
+ }
+ haveOutput = true;
+ out.activateOutput();
+}
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Sasl.h b/cpp/src/qpid/broker/amqp/Sasl.h
new file mode 100644
index 0000000000..079128be02
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Sasl.h
@@ -0,0 +1,72 @@
+#ifndef QPID_BROKER_AMQP_SASL_H
+#define QPID_BROKER_AMQP_SASL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Connection.h"
+#include "qpid/SaslServer.h"
+#include "qpid/amqp/SaslServer.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include <memory>
+namespace qpid {
+namespace sys {
+class SecurityLayer;
+}
+namespace broker {
+namespace amqp {
+
+/**
+ * An AMQP 1.0 SASL Security Layer for authentication and optionally
+ * encryption.
+ */
+class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer
+{
+ public:
+ Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> authenticator);
+ ~Sasl();
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(char* buffer, size_t size);
+ bool canEncode();
+
+ void closed();
+ bool isClosed() const;
+
+ framing::ProtocolVersion getVersion() const;
+ private:
+ qpid::sys::OutputControl& out;
+ Connection connection;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+ std::auto_ptr<qpid::SaslServer> authenticator;
+ enum {
+ INCOMPLETE, SUCCESS_PENDING, FAILURE_PENDING, AUTHENTICATED, FAILED
+ } state;
+
+ bool writeHeader;
+ bool haveOutput;
+
+ void init(const std::string& mechanism, const std::string* response, const std::string* hostname);
+ void response(const std::string*);
+ void respond(qpid::SaslServer::Status status, const std::string& challenge);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_SASL_H*/
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
new file mode 100644
index 0000000000..fabe609473
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Session.h"
+#include "Outgoing.h"
+#include "Message.h"
+#include "ManagedConnection.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/NodeProperties.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include <boost/intrusive_ptr.hpp>
+#include <boost/format.hpp>
+#include <map>
+#include <sstream>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+class Target
+{
+ public:
+ Target(pn_link_t* l) : credit(100), window(0), link(l) {}
+ virtual ~Target() {}
+ bool flow();
+ bool needFlow();
+ virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message
+ protected:
+ const uint32_t credit;
+ uint32_t window;
+ pn_link_t* link;
+};
+
+class Queue : public Target
+{
+ public:
+ Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {}
+ void handle(qpid::broker::Message& m);
+ private:
+ boost::shared_ptr<qpid::broker::Queue> queue;
+};
+
+class Exchange : public Target
+{
+ public:
+ Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {}
+ void handle(qpid::broker::Message& m);
+ private:
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+};
+
+Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
+ : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
+
+
+Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus)
+{
+ ResolvedNode node;
+ node.exchange = broker.getExchanges().find(name);
+ node.queue = broker.getQueues().find(name);
+ if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) {
+ //TODO: handle dynamic creation
+ //is it a queue or an exchange?
+ NodeProperties properties;
+ properties.read(pn_terminus_properties(terminus));
+ if (properties.isQueue()) {
+ node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ } else {
+ qpid::framing::FieldTable args;
+ node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(),
+ args, connection.getUserid(), connection.getId()).first;
+ }
+ } else if (node.queue && node.exchange) {
+ QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
+ node.exchange.reset();
+ }
+ return node;
+}
+
+void Session::attach(pn_link_t* link)
+{
+ if (pn_link_is_sender(link)) {
+ pn_terminus_t* source = pn_link_remote_source(link);
+ //i.e a subscription
+ if (pn_terminus_get_type(source) == PN_UNSPECIFIED) {
+ throw qpid::Exception("No source specified!");/*invalid-field?*/
+ }
+ std::string name = pn_terminus_get_address(source);
+ QPID_LOG(debug, "Received attach request for outgoing link from " << name);
+ pn_terminus_set_address(pn_link_source(link), name.c_str());
+
+ ResolvedNode node = resolve(name, source);
+ Filter filter;
+ filter.read(pn_terminus_filter(source));
+
+ if (node.queue) {
+ boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false));
+ q->init();
+ if (filter.hasSubjectFilter()) {
+ q->setSubjectFilter(filter.getSubjectFilter());
+ }
+ senders[link] = q;
+ } else if (node.exchange) {
+ QueueSettings settings(false, true);
+ //TODO: populate settings from source details when available from engine
+ boost::shared_ptr<qpid::broker::Queue> queue
+ = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ if (filter.hasSubjectFilter()) {
+ filter.bind(node.exchange, queue);
+ filter.write(pn_terminus_filter(pn_link_source(link)));
+ } else if (node.exchange->getType() == FanOutExchange::typeName) {
+ node.exchange->bind(queue, std::string(), 0);
+ } else if (node.exchange->getType() == TopicExchange::typeName) {
+ node.exchange->bind(queue, "#", 0);
+ } else {
+ throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
+ }
+ boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true));
+ senders[link] = q;
+ q->init();
+ } else {
+ pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
+ throw qpid::Exception("Node not found: " + name);/*not-found*/
+ }
+ QPID_LOG(debug, "Outgoing link attached");
+ } else {
+ pn_terminus_t* target = pn_link_remote_target(link);
+ if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
+ throw qpid::Exception("No target specified!");/*invalid field?*/
+ }
+ std::string name = pn_terminus_get_address(target);
+ QPID_LOG(debug, "Received attach request for incoming link to " << name);
+ pn_terminus_set_address(pn_link_target(link), name.c_str());
+
+ ResolvedNode node = resolve(name, target);
+
+ if (node.queue) {
+ boost::shared_ptr<Target> q(new Queue(node.queue, link));
+ targets[link] = q;
+ } else if (node.exchange) {
+ boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
+ targets[link] = e;
+ } else {
+ pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
+ throw qpid::Exception("Node not found: " + name);/*not-found*/
+ }
+ QPID_LOG(debug, "Incoming link attached");
+ }
+}
+
+void Session::detach(pn_link_t* link)
+{
+ if (pn_link_is_sender(link)) {
+ Senders::iterator i = senders.find(link);
+ if (i != senders.end()) {
+ i->second->detached();
+ senders.erase(i);
+ QPID_LOG(debug, "Outgoing link detached");
+ }
+ } else {
+ targets.erase(link);
+ QPID_LOG(debug, "Incoming link detached");
+ }
+}
+namespace {
+ class Transfer : public qpid::broker::AsyncCompletion::Callback
+ {
+ public:
+ Transfer(pn_delivery_t* d, boost::shared_ptr<Session> s) : delivery(d), session(s) {}
+ void completed(bool sync) { session->accepted(delivery, sync); }
+ boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone()
+ {
+ boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session));
+ return copy;
+ }
+ private:
+ pn_delivery_t* delivery;
+ boost::shared_ptr<Session> session;
+ };
+}
+
+void Session::accepted(pn_delivery_t* delivery, bool sync)
+{
+ if (sync) {
+ //this is on IO thread
+ pn_delivery_update(delivery, PN_ACCEPTED);
+ pn_delivery_settle(delivery);//do we need to check settlement modes/orders?
+ incomingMessageAccepted();
+ } else {
+ //this is not on IO thread, need to delay processing until on IO thread
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!deleted) {
+ completed.push_back(delivery);
+ out.activateOutput();
+ }
+ }
+}
+
+void Session::incoming(pn_link_t* link, pn_delivery_t* delivery)
+{
+ pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+ QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size));
+ boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
+ /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
+ received->scan();
+ pn_link_advance(link);
+
+ qpid::broker::Message message(received, received);
+
+ incomingMessageReceived();
+ Targets::iterator target = targets.find(link);
+ if (target == targets.end()) {
+ QPID_LOG(error, "Received message on unknown link");
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);//do we need to check settlement modes/orders?
+ incomingMessageRejected();
+ } else {
+ target->second->handle(message);
+ received->begin();
+ Transfer t(delivery, shared_from_this());
+ received->end(t);
+ if (target->second->needFlow()) out.activateOutput();
+ }
+}
+void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
+{
+ Senders::iterator sender = senders.find(link);
+ if (sender == senders.end()) {
+ QPID_LOG(error, "Delivery returned for unknown link");
+ } else {
+ sender->second->handle(delivery);
+ }
+}
+
+bool Session::dispatch()
+{
+ bool output(false);
+ for (Senders::iterator s = senders.begin(); s != senders.end(); ++s) {
+ if (s->second->dispatch()) output = true;
+ }
+ if (completed.size()) {
+ output = true;
+ std::deque<pn_delivery_t*> copy;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ completed.swap(copy);
+ }
+ for (std::deque<pn_delivery_t*>::iterator i = copy.begin(); i != copy.end(); ++i) {
+ accepted(*i, true);
+ }
+ }
+ for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) {
+ if (t->second->flow()) output = true;
+ }
+
+ return output;
+}
+
+void Session::close()
+{
+ for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ i->second->detached();
+ }
+ senders.clear();
+ targets.clear();//at present no explicit cleanup required for targets
+ QPID_LOG(debug, "Session closed, all senders cancelled.");
+ qpid::sys::Mutex::ScopedLock l(lock);
+ deleted = true;
+}
+
+void Queue::handle(qpid::broker::Message& message)
+{
+ queue->deliver(message);
+ --window;
+}
+
+void Exchange::handle(qpid::broker::Message& message)
+{
+ DeliverableMessage deliverable(message, 0);
+ exchange->route(deliverable);
+ --window;
+}
+
+bool Target::flow()
+{
+ bool issue = window < credit;
+ if (issue) {
+ pn_link_flow(link, credit - window);//TODO: proper flow control
+ window = credit;
+ }
+ return issue;
+}
+
+bool Target::needFlow()
+{
+ return window <= (credit/2);
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Session.h b/cpp/src/qpid/broker/amqp/Session.h
new file mode 100644
index 0000000000..7dbdaf05fc
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Session.h
@@ -0,0 +1,87 @@
+#ifndef QPID_BROKER_AMQP1_SESSION_H
+#define QPID_BROKER_AMQP1_SESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/broker/amqp/ManagedSession.h"
+#include <deque>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+struct pn_delivery_t;
+struct pn_link_t;
+struct pn_session_t;
+struct pn_terminus_t;
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+class Exchange;
+class Queue;
+
+namespace amqp {
+
+class ManagedConnection;
+class Outgoing;
+class Target;
+/**
+ *
+ */
+class Session : public ManagedSession, public boost::enable_shared_from_this<Session>
+{
+ public:
+ Session(pn_session_t*, qpid::broker::Broker&, ManagedConnection&, qpid::sys::OutputControl&);
+ void attach(pn_link_t*);
+ void detach(pn_link_t*);
+ void incoming(pn_link_t*, pn_delivery_t*);
+ void outgoing(pn_link_t*, pn_delivery_t*);
+ bool dispatch();
+ void close();
+
+ //called when a transfer is completly processed (e.g.including stored on disk)
+ void accepted(pn_delivery_t*, bool sync);
+ private:
+ typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > Senders;
+ typedef std::map<pn_link_t*, boost::shared_ptr<Target> > Targets;
+ pn_session_t* session;
+ qpid::broker::Broker& broker;
+ ManagedConnection& connection;
+ qpid::sys::OutputControl& out;
+ Targets targets;
+ Senders senders;
+ std::deque<pn_delivery_t*> completed;
+ bool deleted;
+ qpid::sys::Mutex lock;
+ struct ResolvedNode
+ {
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ boost::shared_ptr<qpid::broker::Queue> queue;
+ };
+
+ ResolvedNode resolve(const std::string name, pn_terminus_t* terminus);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP1_SESSION_H*/
diff --git a/cpp/src/qpid/broker/amqp/Translation.cpp b/cpp/src/qpid/broker/amqp/Translation.cpp
new file mode 100644
index 0000000000..ca2094b965
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Translation.h"
+#include "qpid/broker/amqp/Outgoing.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/amqp/Decoder.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/Variant.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+
+const std::string EMPTY;
+const std::string FORWARD_SLASH("/");
+
+std::string translate(const qpid::framing::ReplyTo r)
+{
+ if (r.hasExchange()) {
+ if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
+ else return r.getExchange();
+ } else return r.getRoutingKey();
+}
+std::string translate(const qpid::amqp::CharSequence& chars)
+{
+ if (chars.data && chars.size) return std::string(chars.data, chars.size);
+ else return EMPTY;
+}
+bool setMessageId(qpid::framing::MessageProperties& m, const qpid::amqp::CharSequence& chars)
+{
+ if (chars.data && chars.size) {
+ if (chars.size == 16) {
+ m.setMessageId(qpid::framing::Uuid(chars.data));
+ return true;
+ } else {
+ std::istringstream in(translate(chars));
+ qpid::framing::Uuid uuid;
+ in >> uuid;
+ if (!in.fail()) {
+ m.setMessageId(uuid);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
+{
+ public:
+ bool hasMessageId() const { return messageProperties && messageProperties->hasMessageId(); }
+ std::string getMessageId() const { return messageProperties ? messageProperties->getMessageId().str() : EMPTY; }
+ bool hasUserId() const { return messageProperties && messageProperties->hasUserId(); }
+ std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
+ bool hasTo() const { return getDestination().size() || hasSubject(); }
+ std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); }
+ bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
+ std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
+ bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
+ std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
+ bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
+ std::string getCorrelationId() const { return messageProperties ? messageProperties->getCorrelationId() : EMPTY; }
+ bool hasContentType() const { return messageProperties && messageProperties->hasContentType(); }
+ std::string getContentType() const { return messageProperties ? messageProperties->getContentType() : EMPTY; }
+ bool hasContentEncoding() const { return messageProperties && messageProperties->hasContentEncoding(); }
+ std::string getContentEncoding() const { return messageProperties ? messageProperties->getContentEncoding() : EMPTY; }
+ bool hasAbsoluteExpiryTime() const { return deliveryProperties && deliveryProperties->hasExpiration(); }
+ int64_t getAbsoluteExpiryTime() const { return deliveryProperties ? deliveryProperties->getExpiration() : 0; }
+ bool hasCreationTime() const { return false; }
+ int64_t getCreationTime() const { return 0; }
+ bool hasGroupId() const {return false; }
+ std::string getGroupId() const { return EMPTY; }
+ bool hasGroupSequence() const { return false; }
+ uint32_t getGroupSequence() const { return 0; }
+ bool hasReplyToGroupId() const { return false; }
+ std::string getReplyToGroupId() const { return EMPTY; }
+
+ const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
+ Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
+ messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
+ deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
+ {}
+ private:
+ const qpid::broker::amqp_0_10::MessageTransfer& transfer;
+ const qpid::framing::MessageProperties* messageProperties;
+ const qpid::framing::DeliveryProperties* deliveryProperties;
+
+ std::string getDestination() const
+ {
+ return transfer.getMethod<qpid::framing::MessageTransferBody>()->getDestination();
+ }
+};
+}
+
+Translation::Translation(const qpid::broker::Message& m) : original(m) {}
+
+
+boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
+{
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()));
+ if (t) {
+ return t;//no translation required
+ } else {
+ const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
+ if (message) {
+ //translate 1.0 message into 0-10
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0)));
+ qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody()));
+ qpid::framing::AMQFrame content((qpid::framing::AMQContentBody()));
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
+
+ qpid::amqp::CharSequence body = message->getBody();
+ content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
+ transfer->getFrames().append(content);
+
+ qpid::framing::MessageProperties* props =
+ transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
+ props->setContentLength(body.size);
+
+ qpid::amqp::MessageId mid = message->getMessageId();
+ qpid::framing::Uuid uuid;
+ switch (mid.type) {
+ case qpid::amqp::MessageId::UUID:
+ case qpid::amqp::MessageId::BYTES:
+ if (mid.value.bytes.size == 0) break;
+ if (setMessageId(*props, mid.value.bytes)) break;
+ case qpid::amqp::MessageId::ULONG:
+ QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID");
+ break;
+ }
+
+ qpid::amqp::MessageId cid = message->getCorrelationId();
+ switch (cid.type) {
+ case qpid::amqp::MessageId::UUID:
+ assert(cid.value.bytes.size = 16);
+ props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
+ break;
+ case qpid::amqp::MessageId::BYTES:
+ if (cid.value.bytes.size) {
+ props->setCorrelationId(translate(cid.value.bytes));
+ }
+ break;
+ case qpid::amqp::MessageId::ULONG:
+ props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
+ break;
+ }
+ // TODO: ReplyTo - there is no way to reliably determine
+ // the type of the node from just its name, unless we
+ // query the brokers registries
+
+ if (message->getContentType()) props->setContentType(translate(message->getContentType()));
+ if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
+ props->setUserId(message->getUserId());
+ // TODO: FieldTable applicationHeaders;
+ qpid::amqp::CharSequence ap = message->getApplicationProperties();
+ if (ap) {
+ qpid::amqp::Decoder d(ap.data, ap.size);
+ qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
+ }
+
+ qpid::framing::DeliveryProperties* dp =
+ transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
+ dp->setPriority(message->getPriority());
+ if (message->isPersistent()) dp->setDeliveryMode(2);
+ if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+
+ return transfer.get();
+ } else {
+ throw qpid::Exception("Could not write message data in AMQP 0-10 format");
+ }
+ }
+}
+
+void Translation::write(Outgoing& out)
+{
+ const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
+ if (message) {
+ //write annotations
+ //TODO: merge in any newly added annotations
+ qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations();
+ qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations();
+ if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size);
+ if (messageAnnotations.size) out.write(messageAnnotations.data, messageAnnotations.size);
+ //write bare message
+ qpid::amqp::CharSequence bareMessage = message->getBareMessage();
+ if (bareMessage.size) out.write(bareMessage.data, bareMessage.size);
+ //write footer:
+ qpid::amqp::CharSequence footer = message->getFooter();
+ if (footer.size) out.write(footer.data, footer.size);
+ } else {
+ const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding());
+ if (transfer) {
+ Properties_0_10 properties(*transfer);
+ qpid::types::Variant::Map applicationProperties;
+ qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
+ std::string content = transfer->getContent();
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeBinary(content, &qpid::amqp::message::DATA);
+ out.write(&buffer[0], encoder.getPosition());
+ } else {
+ QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
+ }
+ }
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Translation.h b/cpp/src/qpid/broker/amqp/Translation.h
new file mode 100644
index 0000000000..64d96560e3
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Translation.h
@@ -0,0 +1,58 @@
+#ifndef QPID_BROKER_AMQP_TRANSLATION_H
+#define QPID_BROKER_AMQP_TRANSLATION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Message;
+namespace amqp_0_10 {
+class MessageTransfer;
+}
+namespace amqp {
+
+class Outgoing;
+/**
+ *
+ */
+class Translation
+{
+ public:
+ Translation(const qpid::broker::Message& message);
+
+ /**
+ * @returns a pointer to an AMQP 0-10 message transfer suitable
+ * for sending on an 0-10 session, translating from 1.0 as
+ * necessary
+ */
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> getTransfer();
+ /**
+ * Writes the AMQP 1.0 bare message and any annotations, translating from 0-10 if necessary
+ */
+ void write(Outgoing&);
+ private:
+ const qpid::broker::Message& original;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_TRANSLATION_H*/