diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging')
63 files changed, 9435 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp new file mode 100644 index 0000000000..6fbaeef661 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Address.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/framing/Uuid.h" +#include <sstream> +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +namespace { +const std::string SUBJECT_DIVIDER = "/"; +const std::string OPTIONS_DIVIDER = ";"; +const std::string SPACE = " "; +const std::string TYPE = "type"; +} +Address::Address() : impl(new AddressImpl()) {} +Address::Address(const std::string& address) : impl(new AddressImpl()) +{ + AddressParser parser(address); + parser.parse(*this); +} +Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options, + const std::string& type) + : impl(new AddressImpl(name, subject, options)) { setType(type); } +Address::Address(const Address& a) : + impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) { impl->temporary = a.impl->temporary; } +Address::~Address() { delete impl; } + +Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; } + + +std::string Address::str() const +{ + std::stringstream out; + out << impl->name; + if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject; + if (!impl->options.empty()) out << OPTIONS_DIVIDER << impl->options; + return out.str(); +} +Address::operator bool() const { return !impl->name.empty(); } +bool Address::operator !() const { return impl->name.empty(); } + +const std::string& Address::getName() const { return impl->name; } +void Address::setName(const std::string& name) { impl->name = name; } +const std::string& Address::getSubject() const { return impl->subject; } +void Address::setSubject(const std::string& subject) { impl->subject = subject; } +const Variant::Map& Address::getOptions() const { return impl->options; } +Variant::Map& Address::getOptions() { return impl->options; } +void Address::setOptions(const Variant::Map& options) { impl->options = options; } + + +namespace{ +const Variant EMPTY_VARIANT; +const std::string EMPTY_STRING; +const std::string NODE_PROPERTIES="node"; +} + +const Variant& find(const Variant::Map& map, const std::string& key) +{ + Variant::Map::const_iterator i = map.find(key); + if (i == map.end()) return EMPTY_VARIANT; + else return i->second; +} + +std::string Address::getType() const +{ + const Variant& props = find(impl->options, NODE_PROPERTIES); + if (props.getType() == VAR_MAP) { + const Variant& type = find(props.asMap(), TYPE); + if (!type.isVoid()) return type.asString(); + } + return EMPTY_STRING; +} + +void Address::setType(const std::string& type) +{ + Variant& props = impl->options[NODE_PROPERTIES]; + if (props.isVoid()) props = Variant::Map(); + props.asMap()[TYPE] = type; +} + +std::ostream& operator<<(std::ostream& out, const Address& address) +{ + out << address.str(); + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/AddressImpl.h b/qpid/cpp/src/qpid/messaging/AddressImpl.h new file mode 100644 index 0000000000..8d34bd73c4 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/AddressImpl.h @@ -0,0 +1,45 @@ +#ifndef QPID_MESSAGING_ADDRESSIMPL_H +#define QPID_MESSAGING_ADDRESSIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/types/Variant.h" +namespace qpid { +namespace messaging { + +class AddressImpl +{ + public: + std::string name; + std::string subject; + qpid::types::Variant::Map options; + bool temporary; + + AddressImpl() : temporary(false) {} + AddressImpl(const std::string& n, const std::string& s, const qpid::types::Variant::Map& o) : + name(n), subject(s), options(o), temporary(false) {} + static void setTemporary(Address& a, bool value) { a.impl->temporary = value; } + static bool isTemporary(const Address& a) { return a.impl->temporary; } +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_ADDRESSIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.cpp b/qpid/cpp/src/qpid/messaging/AddressParser.cpp new file mode 100644 index 0000000000..882deba463 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/AddressParser.cpp @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AddressParser.h" +#include "AddressImpl.h" +#include "qpid/framing/Uuid.h" +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +AddressParser::AddressParser(const std::string& s) : input(s), current(0) {} + +bool AddressParser::error(const std::string& message) +{ + throw MalformedAddress((boost::format("%1%, character %2% of %3%") % message % current % input).str()); +} + +bool AddressParser::parse(Address& address) +{ + std::string name; + if (readName(name)) { + if (name.find('#') == 0) { + name = qpid::framing::Uuid(true).str() + name; + AddressImpl::setTemporary(address, true); + } + address.setName(name); + if (readChar('/')) { + std::string subject; + readSubject(subject); + address.setSubject(subject); + } + if (readChar(';')) { + Variant options = Variant::Map(); + if (readMap(options)) { + address.setOptions(options.asMap()); + } + } + //skip trailing whitespace + while (!eos() && iswhitespace()) ++current; + return eos() || error("Unexpected chars in address: " + input.substr(current)); + } else { + return input.empty() || error("Expected name"); + } +} + +bool AddressParser::parseMap(Variant::Map& map) +{ + if (readChar('{')) { + readMapEntries(map); + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +bool AddressParser::parseList(Variant::List& list) +{ + if (readChar('[')) { + readListItems(list); + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + + +bool AddressParser::readList(Variant& value) +{ + if (readChar('[')) { + value = Variant::List(); + readListItems(value.asList()); + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + +void AddressParser::readListItems(Variant::List& list) +{ + Variant item; + while (readValueIfExists(item)) { + list.push_back(item); + if (!readChar(',')) break; + } +} + +bool AddressParser::readMap(Variant& value) +{ + if (readChar('{')) { + value = Variant::Map(); + readMapEntries(value.asMap()); + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +void AddressParser::readMapEntries(Variant::Map& map) +{ + while (readKeyValuePair(map) && readChar(',')) {} +} + +bool AddressParser::readKeyValuePair(Variant::Map& map) +{ + std::string key; + Variant value; + if (readKey(key)) { + if (readChar(':') && readValue(value)) { + map[key] = value; + return true; + } else { + return error("Bad key-value pair, expected ':'"); + } + } else { + return false; + } +} + +bool AddressParser::readKey(std::string& key) +{ + return readWord(key) || readQuotedString(key); +} + +bool AddressParser::readValue(Variant& value) +{ + return readValueIfExists(value) || error("Expected value"); +} + +bool AddressParser::readValueIfExists(Variant& value) +{ + return readSimpleValue(value) || readQuotedValue(value) || + readMap(value) || readList(value); +} + +bool AddressParser::readString(std::string& value, char delimiter) +{ + if (readChar(delimiter)) { + std::string::size_type start = current; + while (!eos()) { + if (input.at(current) == delimiter) { + if (current > start) { + value = input.substr(start, current - start); + } else { + value = ""; + } + ++current; + return true; + } else { + ++current; + } + } + return error("Unmatched delimiter"); + } else { + return false; + } +} + +bool AddressParser::readName(std::string& name) +{ + return readQuotedString(name) || readWord(name, "/;"); +} + +bool AddressParser::readSubject(std::string& subject) +{ + return readQuotedString(subject) || readWord(subject, ";"); +} + +bool AddressParser::readQuotedString(std::string& s) +{ + return readString(s, '"') || readString(s, '\''); +} + +bool AddressParser::readQuotedValue(Variant& value) +{ + std::string s; + if (readQuotedString(s)) { + value = s; + value.setEncoding("utf8"); + return true; + } else { + return false; + } +} + +bool AddressParser::readSimpleValue(Variant& value) +{ + std::string s; + if (readWord(s)) { + value.parse(s); + if (value.getType() == VAR_STRING) value.setEncoding("utf8"); + return true; + } else { + return false; + } +} + +bool AddressParser::readWord(std::string& value, const std::string& delims) +{ + //skip leading whitespace + while (!eos() && iswhitespace()) ++current; + + //read any number of non-whitespace, non-reserved chars into value + std::string::size_type start = current; + while (!eos() && !iswhitespace() && !in(delims)) ++current; + + if (current > start) { + value = input.substr(start, current - start); + return true; + } else { + return false; + } +} + +bool AddressParser::readChar(char c) +{ + while (!eos()) { + if (iswhitespace()) { + ++current; + } else if (input.at(current) == c) { + ++current; + return true; + } else { + return false; + } + } + return false; +} + +bool AddressParser::iswhitespace() +{ + return ::isspace(input.at(current)); +} + +bool AddressParser::isreserved() +{ + return in(RESERVED); +} + +bool AddressParser::in(const std::string& chars) +{ + return chars.find(input.at(current)) != std::string::npos; +} + +bool AddressParser::eos() +{ + return current >= input.size(); +} + +const std::string AddressParser::RESERVED = "\'\"{}[],:/"; + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.h b/qpid/cpp/src/qpid/messaging/AddressParser.h new file mode 100644 index 0000000000..5e429e1ca9 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/AddressParser.h @@ -0,0 +1,67 @@ +#ifndef QPID_MESSAGING_ADDRESSPARSER_H +#define QPID_MESSAGING_ADDRESSPARSER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ImportExport.h" +#include "qpid/messaging/Address.h" + +namespace qpid { +namespace messaging { + +class AddressParser +{ + public: + QPID_MESSAGING_EXTERN AddressParser(const std::string&); + bool parse(Address& address); + QPID_MESSAGING_EXTERN bool parseMap(qpid::types::Variant::Map& map); + QPID_MESSAGING_EXTERN bool parseList(qpid::types::Variant::List& list); + private: + const std::string& input; + std::string::size_type current; + static const std::string RESERVED; + + bool readChar(char c); + bool readQuotedString(std::string& s); + bool readQuotedValue(qpid::types::Variant& value); + bool readString(std::string& value, char delimiter); + bool readWord(std::string& word, const std::string& delims = RESERVED); + bool readSimpleValue(qpid::types::Variant& word); + bool readKey(std::string& key); + bool readValue(qpid::types::Variant& value); + bool readValueIfExists(qpid::types::Variant& value); + bool readKeyValuePair(qpid::types::Variant::Map& map); + bool readMap(qpid::types::Variant& value); + bool readList(qpid::types::Variant& value); + bool readName(std::string& name); + bool readSubject(std::string& subject); + bool error(const std::string& message); + bool eos(); + bool iswhitespace(); + bool in(const std::string& delims); + bool isreserved(); + void readListItems(qpid::types::Variant::List& list); + void readMapEntries(qpid::types::Variant::Map& map); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_ADDRESSPARSER_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp new file mode 100644 index 0000000000..c40d32cbc1 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/PrivateImplRef.h" +#include "qpid/messaging/ProtocolRegistry.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { + +// Explicitly instantiate Handle superclass +template class Handle<ConnectionImpl>; + +using namespace qpid::types; + +typedef PrivateImplRef<qpid::messaging::Connection> PI; + +Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); } +Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); } +Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); } +Connection::~Connection() { PI::dtor(*this); } + +Connection::Connection(const std::string& url, const std::string& o) +{ + Variant::Map options; + AddressParser parser(o); + if (o.empty() || parser.parseMap(options)) { + PI::ctor(*this, ProtocolRegistry::create(url, options)); + } else { + throw InvalidOptionString("Invalid option string: " + o); + } +} +Connection::Connection(const std::string& url, const Variant::Map& options) +{ + PI::ctor(*this, ProtocolRegistry::create(url, options)); +} + +Connection::Connection() +{ + Variant::Map options; + std::string url = "127.0.0.1:5672"; + PI::ctor(*this, ProtocolRegistry::create(url, options)); +} + +void Connection::open() +{ + while (true) { + try { + impl->open(); + return; + } catch (const ProtocolVersionError& e) { + PI::set(*this, ProtocolRegistry::next(PI::get(impl).get())); + QPID_LOG(info, e.what() << ", trying alternative protocol version..."); + } + } +} +bool Connection::isOpen() { return impl->isOpen(); } +bool Connection::isOpen() const { return impl->isOpen(); } +void Connection::close() { impl->close(); } +Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); } +Session Connection::createTransactionalSession(const std::string& name) +{ + return impl->newSession(true, name); +} +Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } +void Connection::setOption(const std::string& name, const Variant& value) +{ + impl->setOption(name, value); +} +std::string Connection::getAuthenticatedUsername() +{ + return impl->getAuthenticatedUsername(); +} + +void Connection::reconnect(const std::string& url) +{ + impl->reconnect(url); +} +void Connection::reconnect() +{ + impl->reconnect(); +} +std::string Connection::getUrl() const +{ + return impl->getUrl(); +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h new file mode 100644 index 0000000000..05d835b282 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h @@ -0,0 +1,60 @@ +#ifndef QPID_MESSAGING_CONNECTIONIMPL_H +#define QPID_MESSAGING_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <boost/function.hpp> +#include "qpid/RefCounted.h" + +namespace qpid { + +namespace types { +class Variant; +} + +namespace messaging { + +class ProtocolRegistry; +class Session; + +class ConnectionImpl : public virtual qpid::RefCounted +{ + public: + virtual ~ConnectionImpl() {} + virtual void open() = 0; + virtual bool isOpen() const = 0; + virtual void close() = 0; + virtual Session newSession(bool transactional, const std::string& name) = 0; + virtual Session getSession(const std::string& name) const = 0; + virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0; + virtual std::string getAuthenticatedUsername() = 0; + virtual void reconnect(const std::string& url) = 0; + virtual void reconnect() = 0; + virtual std::string getUrl() const = 0; + private: + friend class ProtocolRegistry; + boost::function<ConnectionImpl*()> next; +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_CONNECTIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp new file mode 100644 index 0000000000..10c131c22f --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include <algorithm> +#include <limits> + +namespace qpid { +namespace messaging { + +namespace { +double FOREVER(std::numeric_limits<double>::max()); + +double timeValue(const qpid::types::Variant& value) { + if (types::isIntegerType(value.getType())) + return double(value.asInt64()); + return value.asDouble(); +} + +void merge(const std::string& value, std::vector<std::string>& list) { + if (std::find(list.begin(), list.end(), value) == list.end()) + list.push_back(value); +} + +void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to) +{ + for (qpid::types::Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) + merge(i->asString(), to); +} + +} + +ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options) + : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), + retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false) +{ + for (qpid::types::Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { + set(i->first, i->second); + } +} + +void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& value) +{ + if (name == "reconnect") { + reconnect = value; + } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { + timeout = timeValue(value); + } else if (name == "reconnect-limit" || name == "reconnect_limit") { + limit = value; + } else if (name == "reconnect-interval" || name == "reconnect_interval") { + maxReconnectInterval = minReconnectInterval = timeValue(value); + } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { + minReconnectInterval = timeValue(value); + } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { + maxReconnectInterval = timeValue(value); + } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { + replaceUrls = value.asBool(); + } else if (name == "reconnect-urls" || name == "reconnect_urls") { + if (replaceUrls) urls.clear(); + if (value.getType() == qpid::types::VAR_LIST) { + merge(value.asList(), urls); + } else { + merge(value.asString(), urls); + } + } else if (name == "username") { + username = value.asString(); + } else if (name == "password") { + password = value.asString(); + } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || + name == "sasl-mechanisms" || name == "sasl_mechanisms") { + mechanism = value.asString(); + } else if (name == "sasl-service" || name == "sasl_service") { + service = value.asString(); + } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { + minSsf = value; + } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { + maxSsf = value; + } else if (name == "heartbeat") { + heartbeat = value; + } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { + tcpNoDelay = value; + } else if (name == "locale") { + locale = value.asString(); + } else if (name == "max-channels" || name == "max_channels") { + maxChannels = value; + } else if (name == "max-frame-size" || name == "max_frame_size") { + maxFrameSize = value; + } else if (name == "bounds") { + bounds = value; + } else if (name == "transport") { + protocol = value.asString(); + } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { + sslCertName = value.asString(); + } else if (name == "ssl-ignore-hostname-verification-failure" || name == "ssl_ignore_hostname_verification_failure") { + sslIgnoreHostnameVerificationFailure = value; + } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { + reconnectOnLimitExceeded = value; + } else if (name == "container-id" || name == "container_id") { + identifier = value.asString(); + } else if (name == "nest-annotations" || name == "nest_annotations") { + nestAnnotations = value; + } else if (name == "set-to-on-send" || name == "set_to_on_send") { + setToOnSend = value; + } else if (name == "properties" || name == "client-properties" || name == "client_properties") { + properties = value.asMap(); + } else { + throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); + } +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h new file mode 100644 index 0000000000..c8c8798b7b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h @@ -0,0 +1,57 @@ +#ifndef QPID_MESSAGING_CONNECTIONOPTIONS_H +#define QPID_MESSAGING_CONNECTIONOPTIONS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ImportExport.h" + +#include "qpid/client/ConnectionSettings.h" +#include <map> +#include <vector> + +namespace qpid { +namespace types { +class Variant; +} +namespace messaging { + +struct ConnectionOptions : qpid::client::ConnectionSettings +{ + std::vector<std::string> urls; + bool replaceUrls; + bool reconnect; + double timeout; + int32_t limit; + double minReconnectInterval; + double maxReconnectInterval; + int32_t retries; + bool reconnectOnLimitExceeded; + std::string identifier; + bool nestAnnotations; + bool setToOnSend; + std::map<std::string, qpid::types::Variant> properties; + + QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); + QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value); +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_CONNECTIONOPTIONS_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Duration.cpp b/qpid/cpp/src/qpid/messaging/Duration.cpp new file mode 100644 index 0000000000..a23e9f5bcb --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Duration.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Duration.h" +#include <limits> + +namespace qpid { +namespace messaging { + +Duration::Duration(uint64_t ms) : milliseconds(ms) {} +uint64_t Duration::getMilliseconds() const { return milliseconds; } + +Duration operator*(const Duration& duration, uint64_t multiplier) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +Duration operator*(uint64_t multiplier, const Duration& duration) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +bool operator==(const Duration& a, const Duration& b) +{ + return a.getMilliseconds() == b.getMilliseconds(); +} + +bool operator!=(const Duration& a, const Duration& b) +{ + return a.getMilliseconds() != b.getMilliseconds(); +} + +const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max()); +const Duration Duration::IMMEDIATE(0); +const Duration Duration::SECOND(1000); +const Duration Duration::MINUTE(SECOND * 60); + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp b/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp new file mode 100644 index 0000000000..4f2fcf2e82 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/FailoverUpdates.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Session.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" +#include "qpid/Url.h" +#include "qpid/framing/Uuid.h" +#include <vector> + +namespace qpid { +namespace messaging { +using framing::Uuid; + +struct FailoverUpdatesImpl : qpid::sys::Runnable +{ + Connection connection; + Session session; + Receiver receiver; + qpid::sys::Thread thread; + + FailoverUpdatesImpl(Connection& c) : connection(c) + { + session = connection.createSession("failover-updates."+Uuid(true).str()); + receiver = session.createReceiver("amq.failover"); + thread = qpid::sys::Thread(*this); + } + + ~FailoverUpdatesImpl() { + try { + session.close(); + } catch(...) {} // Squash exceptions in a destructor. + thread.join(); + } + + void run() + { + try { + Message message; + while (receiver.fetch(message)) { + connection.setOption("reconnect-urls", message.getProperties()["amq.failover"]); + QPID_LOG(debug, "Set reconnect-urls to " << message.getProperties()["amq.failover"]); + session.acknowledge(); + } + } + catch (const ClosedException&) {} + catch (const qpid::TransportFailure& e) { + QPID_LOG(warning, "Failover updates stopped on loss of connection. " << e.what()); + } + catch (const std::exception& e) { + QPID_LOG(warning, "Failover updates stopped due to exception: " << e.what()); + } + } +}; + +FailoverUpdates::FailoverUpdates(Connection& connection) : impl(new FailoverUpdatesImpl(connection)) {} +FailoverUpdates::~FailoverUpdates() { if (impl) { delete impl; } } +FailoverUpdates::FailoverUpdates(const FailoverUpdates&) : impl(0) {} +FailoverUpdates& FailoverUpdates::operator=(const FailoverUpdates&) { return *this; } + + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Logger.cpp b/qpid/cpp/src/qpid/messaging/Logger.cpp new file mode 100644 index 0000000000..c259cb4c1b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Logger.cpp @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/Logger.h" + +#include "qpid/log/Logger.h" +#include "qpid/log/OstreamOutput.h" +#include "qpid/messaging/exceptions.h" + +#include <sstream> +#include <string> +#include <vector> + +using std::string; +using std::vector; + +namespace qpid { +namespace messaging { + +// Proxy class to call the users output class/routine +class ProxyOutput : public qpid::log::Logger::Output { + LoggerOutput& output; + + void log(const qpid::log::Statement& s, const string& message) { + output.log(qpid::messaging::Level(s.level), s.category==qpid::log::external_application, s.file, s.line, s.function, message); + } + +public: + ProxyOutput(LoggerOutput& o) : + output(o) + {} +}; + +LoggerOutput::~LoggerOutput() +{ +} + +inline qpid::log::Logger& logger() { + static qpid::log::Logger& theLogger=qpid::log::Logger::instance(); + return theLogger; +} + +namespace { + std::string loggerUsage; + qpid::log::Selector loggerSelector; +} + +std::string Logger::usage() +{ + return loggerUsage; +} + +void Logger::configure(int argc, const char* argv[], const string& pre) +try +{ + bool logToStdout = false; + bool logToStderr = false; + string logFile; + std::vector<std::string> selectors; + std::vector<std::string> deselectors; + bool time = false; + bool level = false; + bool thread = false; + bool source = false; + bool function = false; + bool hiresTs = false; + + selectors.push_back("notice+"); // Set this for the usage message default + + string prefix = pre.empty() ? pre : pre+"-"; + qpid::Options myOptions; + myOptions.addOptions() + ((prefix+"log-enable").c_str(), optValue(selectors, "RULE"), + ("Enables logging for selected levels and components. " + "RULE is in the form 'LEVEL[+-][:PATTERN]'\n" + "LEVEL is one of: \n\t "+qpid::log::getLevels()+"\n" + "PATTERN is a logging category name, or a namespace-qualified " + "function name or name fragment. " + "Logging category names are: \n\t "+qpid::log::getCategories()+"\n" + "The category \"Application\" contains all messages logged by the application.\n" + "For example:\n" + "\t'--log-enable warning+'\n" + "logs all warning, error and critical messages.\n" + "\t'--log-enable trace+:Application --log-enable notice+'\n" + "logs all application messages, but only notice or higher for the qpid library messages\n" + "\t'--log-enable debug:framing'\n" + "logs debug messages from all functions with 'framing' in the namespace or function name.\n" + "This option can be used multiple times").c_str()) + ((prefix+"log-disable").c_str(), optValue(deselectors, "RULE"), + ("Disables logging for selected levels and components. " + "RULE is in the form 'LEVEL[+-][:PATTERN]'\n" + "LEVEL is one of: \n\t "+qpid::log::getLevels()+"\n" + "PATTERN is a logging category name, or a namespace-qualified " + "function name or name fragment. " + "Logging category names are: \n\t "+qpid::log::getCategories()+"\n" + "For example:\n" + "\t'--log-disable warning-'\n" + "disables logging all warning, notice, info, debug, and trace messages.\n" + "\t'--log-disable trace:Application'\n" + "disables all application trace messages.\n" + "\t'--log-disable debug-:qmf::'\n" + "disables logging debug and trace messages from all functions with 'qmf::' in the namespace.\n" + "This option can be used multiple times").c_str()) + ((prefix+"log-time").c_str(), optValue(time, "yes|no"), "Include time in log messages") + ((prefix+"log-level").c_str(), optValue(level,"yes|no"), "Include severity level in log messages") + ((prefix+"log-source").c_str(), optValue(source,"yes|no"), "Include source file:line in log messages") + ((prefix+"log-thread").c_str(), optValue(thread,"yes|no"), "Include thread ID in log messages") + ((prefix+"log-function").c_str(), optValue(function,"yes|no"), "Include function signature in log messages") + ((prefix+"log-hires-timestamp").c_str(), optValue(hiresTs,"yes|no"), "Use hi-resolution timestamps in log messages") + ((prefix+"log-to-stderr").c_str(), optValue(logToStderr, "yes|no"), "Send logging output to stderr") + ((prefix+"log-to-stdout").c_str(), optValue(logToStdout, "yes|no"), "Send logging output to stdout") + ((prefix+"log-to-file").c_str(), optValue(logFile, "FILE"), "Send log output to FILE.") + ; + + std::ostringstream loggerSStream; + myOptions.print(loggerSStream); + + loggerUsage=loggerSStream.str(); + + selectors.clear(); // Clear to give passed in options precedence + + // Parse the command line not failing for unrecognised options + myOptions.parse(argc, argv, std::string(), true); + + // If no passed in enable or disable log specification then go back to default + if (selectors.empty() && deselectors.empty()) + selectors.push_back("notice+"); + // Set the logger options according to what we just parsed + qpid::log::Options logOptions; + logOptions.selectors = selectors; + logOptions.deselectors = deselectors; + logOptions.time = time; + logOptions.level = level; + logOptions.category = false; + logOptions.thread = thread; + logOptions.source = source; + logOptions.function = function; + logOptions.hiresTs = hiresTs; + + loggerSelector = qpid::log::Selector(logOptions); + logger().clear(); // Need to clear before configuring as it will have been initialised statically already + logger().format(logOptions); + logger().select(loggerSelector); + + // Have to set up the standard output sinks manually + if (logToStderr) + logger().output(std::auto_ptr<qpid::log::Logger::Output> + (new qpid::log::OstreamOutput(std::clog))); + if (logToStdout) + logger().output(std::auto_ptr<qpid::log::Logger::Output> + (new qpid::log::OstreamOutput(std::cout))); + + if (logFile.length() > 0) + logger().output(std::auto_ptr<qpid::log::Logger::Output> + (new qpid::log::OstreamOutput(logFile))); +} +catch (std::exception& e) +{ + throw MessagingException(e.what()); +} + +void Logger::setOutput(LoggerOutput& o) +{ + logger().output(std::auto_ptr<qpid::log::Logger::Output>(new ProxyOutput(o))); +} + +void Logger::log(Level level, const char* file, int line, const char* function, const string& message) +{ + if (loggerSelector.isEnabled(qpid::log::Level(level), function, qpid::log::unspecified)) { + qpid::log::Statement s = { + true, + file, + line, + function, + qpid::log::Level(level), + qpid::log::external_application, + }; + logger().log(s, message); + } +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp new file mode 100644 index 0000000000..62ee93b6c2 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Message.cpp @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <qpid/Exception.h> +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {} +Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {} +Message::Message(qpid::types::Variant& c) : impl(new MessageImpl(std::string())) +{ + setContentObject(c); +} + +Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {} +Message::~Message() { delete impl; } + +Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; } + +void Message::setReplyTo(const Address& d) { impl->setReplyTo(d); } +const Address& Message::getReplyTo() const { return impl->getReplyTo(); } + +void Message::setSubject(const std::string& s) { impl->setSubject(s); } +const std::string& Message::getSubject() const { return impl->getSubject(); } + +void Message::setContentType(const std::string& s) { impl->setContentType(s); } +const std::string& Message::getContentType() const { return impl->getContentType(); } + +void Message::setMessageId(const std::string& id) { impl->setMessageId(id); } +const std::string& Message::getMessageId() const { return impl->getMessageId(); } + +void Message::setUserId(const std::string& id) { impl->setUserId(id); } +const std::string& Message::getUserId() const { return impl->getUserId(); } + +void Message::setCorrelationId(const std::string& id) { impl->setCorrelationId(id); } +const std::string& Message::getCorrelationId() const { return impl->getCorrelationId(); } + +uint8_t Message::getPriority() const { return impl->getPriority(); } +void Message::setPriority(uint8_t priority) { impl->setPriority(priority); } + +void Message::setTtl(Duration ttl) { impl->setTtl(ttl.getMilliseconds()); } +Duration Message::getTtl() const { return Duration(impl->getTtl()); } + +void Message::setDurable(bool durable) { impl->setDurable(durable); } +bool Message::getDurable() const { return impl->isDurable(); } + +bool Message::getRedelivered() const { return impl->isRedelivered(); } +void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); } + +const Variant::Map& Message::getProperties() const { return impl->getHeaders(); } +Variant::Map& Message::getProperties() { return impl->getHeaders(); } +void Message::setProperties(const Variant::Map& p) { getProperties() = p; } +void Message::setProperty(const std::string& k, const qpid::types::Variant& v) { impl->setHeader(k,v); } + +void Message::setContent(const std::string& c) { impl->setBytes(c); } +void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); } +std::string Message::getContent() const { return impl->getBytes(); } + +void Message::setContentBytes(const std::string& c) { impl->setBytes(c); } +std::string Message::getContentBytes() const { return impl->getBytes(); } + +qpid::types::Variant& Message::getContentObject() { return impl->getContent(); } +void Message::setContentObject(const qpid::types::Variant& c) { impl->getContent() = c; } +const qpid::types::Variant& Message::getContentObject() const { return impl->getContent(); } + +const char* Message::getContentPtr() const +{ + return impl->getBytes().data(); +} + +size_t Message::getContentSize() const +{ + return impl->getBytes().size(); +} + +EncodingException::EncodingException(const std::string& msg) : qpid::types::Exception(msg) {} + +const std::string BAD_ENCODING("Unsupported encoding: %1% (only %2% is supported at present)."); + +template <class C> struct MessageCodec +{ + static bool checkEncoding(const std::string& requested) + { + if (requested.size()) { + if (requested == C::contentType) return true; + else throw EncodingException((boost::format(BAD_ENCODING) % requested % C::contentType).str()); + } else { + return false; + } + } + + /* + * Currently only support a single encoding type for both list and + * map, based on AMQP 0-10, though wider support is anticipated in the + * future. This method simply checks that the desired encoding (if one + * is specified, either through the message-content or through an + * override) is indeed supported. + */ + static void checkEncoding(const Message& message, const std::string& requested) + { + checkEncoding(requested) || checkEncoding(message.getContentType()); + } + + static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding) + { + checkEncoding(message, encoding); + try { + C::decode(message.getContent(), object); + } catch (const qpid::Exception &ex) { + throw EncodingException(ex.what()); + } + } + + static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding) + { + checkEncoding(message, encoding); + std::string content; + C::encode(map, content); + message.setContentType(C::contentType); + message.setContent(content); + } +}; + +void decode(const Message& message, Variant::Map& map, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::MapCodec>::decode(message, map, encoding); +} +void decode(const Message& message, Variant::List& list, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::ListCodec>::decode(message, list, encoding); +} +void encode(const Variant::Map& map, Message& message, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::MapCodec>::encode(map, message, encoding); +} +void encode(const Variant::List& list, Message& message, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::ListCodec>::encode(list, message, encoding); +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp new file mode 100644 index 0000000000..620e48ec2e --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageImpl.h" +#include "qpid/messaging/Message.h" + +namespace qpid { +namespace messaging { + +namespace { +const std::string EMPTY_STRING = ""; +} + +using namespace qpid::types; + +MessageImpl::MessageImpl(const std::string& c) : + priority(0), + ttl(0), + durable(false), + redelivered(false), + bytes(c), + contentDecoded(false), + internalId(0) {} +MessageImpl::MessageImpl(const char* chars, size_t count) : + priority(0), + ttl(0), + durable (false), + redelivered(false), + bytes(chars, count), + contentDecoded(false), + internalId(0) {} + +void MessageImpl::clear() +{ + replyTo = Address(); + subject = std::string(); + contentType = std::string(); + messageId = std::string(); + userId= std::string(); + correlationId = std::string(); + priority = 0; + ttl = 0; + durable = false; + redelivered = false; + headers = qpid::types::Variant::Map(); + + bytes = std::string(); + content = qpid::types::Variant(); + contentDecoded = false; + encoded = boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage>(); + internalId = 0; +} + +void MessageImpl::setReplyTo(const Address& d) +{ + replyTo = d; + updated(); +} +const Address& MessageImpl::getReplyTo() const +{ + if (!replyTo && encoded) encoded->getReplyTo(replyTo); + return replyTo; +} + +void MessageImpl::setSubject(const std::string& s) +{ + subject = s; + updated(); +} +const std::string& MessageImpl::getSubject() const +{ + if (!subject.size() && encoded) encoded->getSubject(subject); + return subject; +} + +void MessageImpl::setContentType(const std::string& s) +{ + contentType = s; + updated(); +} +const std::string& MessageImpl::getContentType() const +{ + if (!contentType.size() && encoded) encoded->getContentType(contentType); + return contentType; +} + +void MessageImpl::setMessageId(const std::string& s) +{ + messageId = s; + updated(); +} +const std::string& MessageImpl::getMessageId() const +{ + if (!messageId.size() && encoded) encoded->getMessageId(messageId); + return messageId; +} +void MessageImpl::setUserId(const std::string& s) +{ + userId = s; + updated(); +} +const std::string& MessageImpl::getUserId() const +{ + if (!userId.size() && encoded) encoded->getUserId(userId); + return userId; +} +void MessageImpl::setCorrelationId(const std::string& s) +{ + correlationId = s; + updated(); +} +const std::string& MessageImpl::getCorrelationId() const +{ + if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId); + return correlationId; +} +void MessageImpl::setPriority(uint8_t p) +{ + priority = p; +} +uint8_t MessageImpl::getPriority() const +{ + return priority; +} +void MessageImpl::setTtl(uint64_t t) +{ + ttl = t; +} +uint64_t MessageImpl::getTtl() const +{ + return ttl; +} +void MessageImpl::setDurable(bool d) +{ + durable = d; +} +bool MessageImpl::isDurable() const +{ + return durable; +} +void MessageImpl::setRedelivered(bool b) +{ + redelivered = b; +} +bool MessageImpl::isRedelivered() const +{ + return redelivered; +} + +const Variant::Map& MessageImpl::getHeaders() const +{ + if (!headers.size() && encoded) encoded->populate(headers); + return headers; +} +Variant::Map& MessageImpl::getHeaders() { + if (!headers.size() && encoded) encoded->populate(headers); + updated(); + return headers; +} +void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) +{ + headers[key] = val; updated(); +} + +//should these methods be on MessageContent? +void MessageImpl::setBytes(const std::string& c) +{ + bytes = c; + updated(); +} +void MessageImpl::setBytes(const char* chars, size_t count) +{ + bytes.assign(chars, count); + updated(); +} +const std::string& MessageImpl::getBytes() const +{ + if (encoded && !contentDecoded) { + encoded->getBody(bytes, content); + contentDecoded = true; + } + if (bytes.empty() && content.getType() == VAR_STRING) return content.getString(); + else return bytes; +} +std::string& MessageImpl::getBytes() +{ + updated();//have to assume body may be edited, invalidating our message + if (bytes.empty() && content.getType() == VAR_STRING) return content.getString(); + else return bytes; +} + +qpid::types::Variant& MessageImpl::getContent() +{ + updated();//have to assume content may be edited, invalidating our message + return content; +} + +const qpid::types::Variant& MessageImpl::getContent() const +{ + if (encoded && !contentDecoded) { + encoded->getBody(bytes, content); + contentDecoded = true; + } + return content; +} + +void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; } +qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; } + +void MessageImpl::updated() +{ + + if (!replyTo && encoded) encoded->getReplyTo(replyTo); + if (!subject.size() && encoded) encoded->getSubject(subject); + if (!contentType.size() && encoded) encoded->getContentType(contentType); + if (!messageId.size() && encoded) encoded->getMessageId(messageId); + if (!userId.size() && encoded) encoded->getUserId(userId); + if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId); + if (!headers.size() && encoded) encoded->populate(headers); + if (encoded && !contentDecoded) { + encoded->getBody(bytes, content); + contentDecoded = true; + } + + encoded.reset(); +} + +MessageImpl& MessageImplAccess::get(Message& msg) +{ + return *msg.impl; +} +const MessageImpl& MessageImplAccess::get(const Message& msg) +{ + return *msg.impl; +} +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h new file mode 100644 index 0000000000..647972de16 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -0,0 +1,122 @@ +#ifndef QPID_MESSAGING_MESSAGEIMPL_H +#define QPID_MESSAGING_MESSAGEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/ImportExport.h" + +#include "qpid/messaging/Address.h" +#include "qpid/types/Variant.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/messaging/amqp/EncodedMessage.h" +#include <vector> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace messaging { + +class MessageImpl +{ + private: + mutable Address replyTo; + mutable std::string subject; + mutable std::string contentType; + mutable std::string messageId; + mutable std::string userId; + mutable std::string correlationId; + uint8_t priority; + uint64_t ttl; + bool durable; + bool redelivered; + mutable qpid::types::Variant::Map headers; + + mutable std::string bytes; + mutable qpid::types::Variant content; + mutable bool contentDecoded; + boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded; + + qpid::framing::SequenceNumber internalId; + + void updated(); + public: + MessageImpl(const std::string& c); + MessageImpl(const char* chars, size_t count); + + void clear(); + void setReplyTo(const Address& d); + QPID_MESSAGING_EXTERN const Address& getReplyTo() const; + + void setSubject(const std::string& s); + QPID_MESSAGING_EXTERN const std::string& getSubject() const; + + void setContentType(const std::string& s); + QPID_MESSAGING_EXTERN const std::string& getContentType() const; + + void setMessageId(const std::string&); + QPID_MESSAGING_EXTERN const std::string& getMessageId() const; + void setUserId(const std::string& ); + QPID_MESSAGING_EXTERN const std::string& getUserId() const; + void setCorrelationId(const std::string& ); + QPID_MESSAGING_EXTERN const std::string& getCorrelationId() const; + void setPriority(uint8_t); + QPID_MESSAGING_EXTERN uint8_t getPriority() const; + void setTtl(uint64_t); + QPID_MESSAGING_EXTERN uint64_t getTtl() const; + void setDurable(bool); + QPID_MESSAGING_EXTERN bool isDurable() const; + void setRedelivered(bool); + QPID_MESSAGING_EXTERN bool isRedelivered() const; + + + QPID_MESSAGING_EXTERN const qpid::types::Variant::Map& getHeaders() const; + qpid::types::Variant::Map& getHeaders(); + void setHeader(const std::string& key, const qpid::types::Variant& val); + + void setBytes(const std::string& bytes); + void setBytes(const char* chars, size_t count); + QPID_MESSAGING_EXTERN const std::string& getBytes() const; + std::string& getBytes(); + qpid::types::Variant& getContent(); + QPID_MESSAGING_EXTERN const qpid::types::Variant& getContent() const; + + QPID_MESSAGING_EXTERN void setInternalId(qpid::framing::SequenceNumber id); + QPID_MESSAGING_EXTERN qpid::framing::SequenceNumber getInternalId(); + void setEncoded(boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> e) { encoded = e; } + boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> getEncoded() const { return encoded; } +}; + +class Message; + +/** + * Provides access to the internal MessageImpl for a message which is + * useful when accessing any message state not exposed directly + * through the public API. + */ +struct MessageImplAccess +{ + QPID_MESSAGING_EXTERN static MessageImpl& get(Message&); + QPID_MESSAGING_EXTERN static const MessageImpl& get(const Message&); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_MESSAGEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Message_io.cpp b/qpid/cpp/src/qpid/messaging/Message_io.cpp new file mode 100644 index 0000000000..b6bc43a4e6 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Message_io.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Message_io.h" +#include <ostream> + +namespace qpid { +namespace messaging { + +using namespace std; +ostream& operator<<(ostream& o, const Message& message) { + o << "Message(properties=" << message.getProperties(); + if (!message.getSubject().empty()) { + o << ", subject='" << message.getSubject() << "'"; + } + if (!message.getContentObject().isVoid()) { + o << ", content='"; + if (message.getContentType() == "amqp/map") { + o << message.getContentObject().asMap(); + } else { + o << message.getContentObject(); + } + } + o << "')"; + return o; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/PrivateImplRef.h b/qpid/cpp/src/qpid/messaging/PrivateImplRef.h new file mode 100644 index 0000000000..a60f4eeadf --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/PrivateImplRef.h @@ -0,0 +1,94 @@ +#ifndef QPID_MESSAGING_PRIVATEIMPL_H +#define QPID_MESSAGING_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/ImportExport.h" +#include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" + +namespace qpid { +namespace messaging { + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> PrivateImplRef; + * class FooImpl; + * + * Foo : public Handle<FooImpl> { + * public: + * Foo(FooImpl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int fooDo(); // and other Foo functions... + * + * private: + * typedef FooImpl Impl; + * Impl* impl; + * friend class PrivateImplRef<Foo>; + * + * === Foo.cpp + * + * typedef PrivateImplRef<Foo> PI; + * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::fooDo() { return impl->fooDo(); } + * + */ +template <class T> class PrivateImplRef { + public: + typedef typename T::Impl Impl; + typedef boost::intrusive_ptr<Impl> intrusive_ptr; + + /** Get the implementation pointer from a handle */ + static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const intrusive_ptr& p) { + if (t.impl == p) return; + if (t.impl) intrusive_ptr_release(t.impl); + t.impl = p.get(); + if (t.impl) intrusive_ptr_add_ref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_PRIVATEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp new file mode 100644 index 0000000000..dbb0d6dfc2 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ProtocolRegistry.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/client/LoadPlugins.h" +#include "qpid/log/Statement.h" +#include "qpid/Options.h" +#include "qpid/StringUtils.h" +#include "config.h" +#include <map> +#include <sstream> +#include <boost/bind.hpp> + +using qpid::types::Variant; + +namespace qpid { +namespace messaging { +namespace { +struct ProtocolOptions : qpid::Options +{ + std::string protocolDefaults; + + ProtocolOptions() : qpid::Options("Protocol Settings") + { + addOptions() + ("protocol-defaults", optValue(protocolDefaults, "PROTOCOLS"), "Protocols to use when none are specified"); + } +}; +const std::string SEPARATOR(", "); +const std::string EMPTY; +std::string join(const std::vector<std::string>& in, const std::string& base=EMPTY, const std::string& separator = SEPARATOR) +{ + std::stringstream out; + if (!base.empty()) out << base; + for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { + if (i != in.begin()) out << separator; + out << *i; + } + return out.str(); +} + +typedef std::map<std::string, ProtocolRegistry::Factory*> Factories; + +ConnectionImpl* create_0_10(const std::string& url, const qpid::types::Variant::Map& options) +{ + return new qpid::client::amqp0_10::ConnectionImpl(url, options); +} + +class Registry +{ + public: + Registry() + { + factories["amqp0-10"] = &create_0_10; + CommonOptions common("", "", QPIDC_CONF_FILE); + ProtocolOptions options; + try { + common.parse(0, 0, common.clientConfig, true); + options.parse (0, 0, common.clientConfig, true); + } catch (const std::exception& e) { + throw qpid::types::Exception(QPID_MSG("Failed to parse options while initialising Protocol Registry: " << e.what())); + } + QPID_LOG(debug, "Protocol defaults: " << options.protocolDefaults); + if (!options.protocolDefaults.empty()) { + split(versions, options.protocolDefaults, ", "); + } + } + ProtocolRegistry::Factory* find(const std::string& name) const + { + Factories::const_iterator i = factories.find(name); + if (i == factories.end()) { + std::stringstream error; + error << "Unsupported protocol: " << name; + error << " (valid values are " << getNames() << ")"; + throw MessagingException(error.str()); + } else { + return i->second; + } + } + void add(const std::string& name, ProtocolRegistry::Factory* factory) + { + factories[name] = factory; + } + std::string getNames() const + { + std::stringstream names; + for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) { + if (i != factories.begin()) names << ", "; + names << i->first; + } + return names.str(); + } + void collectNames(std::vector<std::string>& names) const + { + for (std::vector< std::string >::const_iterator i = versions.begin(); i != versions.end(); ++i) { + Factories::const_iterator j = factories.find(*i); + if (j == factories.end()) { + QPID_LOG(notice, "Unsupported protocol specified in defaults " << *i); + } else { + names.push_back(*i); + } + } + if (names.empty()) { + if (!versions.empty()) { + QPID_LOG(warning, "Protocol defaults specified are not valid (" << join(versions) << ") falling back to " << getNames()); + } + for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) { + names.push_back(i->first); + } + } + } + private: + Factories factories; + std::vector<std::string> versions; +}; + +Registry& theRegistry() +{ + static Registry factories; + return factories; +} + +bool extract(const std::string& key, Variant& value, const Variant::Map& in, Variant::Map& out) +{ + bool matched = false; + for (Variant::Map::const_iterator i = in.begin(); i != in.end(); ++i) { + if (i->first == key) { + value = i->second; + matched = true; + } else { + out.insert(*i); + } + } + return matched; +} +} + +ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::Map& options) +{ + qpid::client::theModuleLoader();//ensure modules are loaded + Variant name; + Variant::Map stripped; + std::vector<std::string> versions; + if (extract("protocol", name, options, stripped)) { + split(versions, name.asString(), ", "); + } else { + theRegistry().collectNames(versions); + } + bool debugOn; + QPID_LOG_TEST(debug, debugOn); + if (debugOn) { + QPID_LOG(debug, "Trying versions " << join(versions)); + } + return createInternal(versions, url, stripped, join(versions, "No suitable protocol version supported by peer, tried ")); +} + +ConnectionImpl* ProtocolRegistry::createInternal(const std::vector<std::string>& requested, const std::string& url, const Variant::Map& options, const std::string& error) +{ + std::vector<std::string>::const_iterator i = requested.begin(); + if (i == requested.end()) + throw MessagingException(error); + std::string name = *i; + ConnectionImpl* result = theRegistry().find(name)(url, options); + result->next = boost::bind(&ProtocolRegistry::createInternal, std::vector<std::string>(++i, requested.end()), url, options, error); + return result; + } + +ConnectionImpl* ProtocolRegistry::next(ConnectionImpl* last) +{ + if (last->next) { + return last->next(); + } + throw MessagingException("No suitable protocol version supported by peer"); +} + +void ProtocolRegistry::add(const std::string& name, Factory* factory) +{ + theRegistry().add(name, factory); +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h new file mode 100644 index 0000000000..6a6f5962c3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h @@ -0,0 +1,47 @@ +#ifndef QPID_MESSAGING_PROTOCOLREGISTRY_H +#define QPID_MESSAGING_PROTOCOLREGISTRY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/ImportExport.h" +#include "qpid/types/Variant.h" +#include <vector> + +namespace qpid { +namespace messaging { +class ConnectionImpl; +/** + * Registry for different implementations of the messaging API e.g AMQP 1.0 + */ +class ProtocolRegistry +{ + public: + typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options); + static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options); + static ConnectionImpl* next(ConnectionImpl*); + QPID_MESSAGING_EXTERN static void add(const std::string& name, Factory* factory); + private: + static ConnectionImpl* createInternal(const std::vector<std::string>& versions, const std::string& url, const qpid::types::Variant::Map& options, const std::string& error); +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_PROTOCOLREGISTRY_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp new file mode 100644 index 0000000000..a2c6b4cade --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/messaging/ReceiverImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace messaging { + +// Explicitly instantiate Handle superclass +template class Handle<ReceiverImpl>; + +typedef PrivateImplRef<qpid::messaging::Receiver> PI; + +Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); } +Receiver::Receiver(const Receiver& s) : Handle<ReceiverImpl>() { PI::copy(*this, s); } +Receiver::~Receiver() { PI::dtor(*this); } +Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); } +bool Receiver::get(Message& message, Duration timeout) +{ + MessageImplAccess::get(message).clear(); + return impl->get(message, timeout); +} +Message Receiver::get(Duration timeout) { return impl->get(timeout); } +bool Receiver::fetch(Message& message, Duration timeout) +{ + MessageImplAccess::get(message).clear(); + return impl->fetch(message, timeout); +} +Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); } +void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } +uint32_t Receiver::getCapacity() { return impl->getCapacity(); } +uint32_t Receiver::getAvailable() { return impl->getAvailable(); } +uint32_t Receiver::getUnsettled() { return impl->getUnsettled(); } +void Receiver::close() { impl->close(); } +const std::string& Receiver::getName() const { return impl->getName(); } +Session Receiver::getSession() const { return impl->getSession(); } +bool Receiver::isClosed() const { return impl->isClosed(); } +Address Receiver::getAddress() const { return impl->getAddress(); } +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h new file mode 100644 index 0000000000..59ccc3214e --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h @@ -0,0 +1,56 @@ +#ifndef QPID_MESSAGING_RECEIVERIMPL_H +#define QPID_MESSAGING_RECEIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace messaging { + +class Address; +class Duration; +class Message; +class MessageListener; +class Session; + +class ReceiverImpl : public virtual qpid::RefCounted +{ + public: + virtual ~ReceiverImpl() {} + virtual bool get(Message& message, Duration timeout) = 0; + virtual Message get(Duration timeout) = 0; + virtual bool fetch(Message& message, Duration timeout) = 0; + virtual Message fetch(Duration timeout) = 0; + virtual void setCapacity(uint32_t) = 0; + virtual uint32_t getCapacity() = 0; + virtual uint32_t getAvailable() = 0; + virtual uint32_t getUnsettled() = 0; + virtual void close() = 0; + virtual const std::string& getName() const = 0; + virtual Session getSession() const = 0; + virtual bool isClosed() const = 0; + virtual Address getAddress() const = 0; +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_RECEIVERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp new file mode 100644 index 0000000000..a26f2544c8 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Sender.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/SenderImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace messaging { + +// Explicitly instantiate Handle superclass +template class Handle<SenderImpl>; + +typedef PrivateImplRef<qpid::messaging::Sender> PI; + +Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); } +Sender::Sender(const Sender& s) : qpid::messaging::Handle<SenderImpl>() { PI::copy(*this, s); } +Sender::~Sender() { PI::dtor(*this); } +Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); } +void Sender::send(const Message& message, bool sync) { impl->send(message, sync); } +void Sender::close() { impl->close(); } +void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); } +uint32_t Sender::getCapacity() { return impl->getCapacity(); } +uint32_t Sender::getUnsettled() { return impl->getUnsettled(); } +uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); } +const std::string& Sender::getName() const { return impl->getName(); } +Session Sender::getSession() const { return impl->getSession(); } +Address Sender::getAddress() const { return impl->getAddress(); } +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h new file mode 100644 index 0000000000..91fd9b1536 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h @@ -0,0 +1,50 @@ +#ifndef QPID_MESSAGING_SENDERIMPL_H +#define QPID_MESSAGING_SENDERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace messaging { + +class Address; +class Message; +class Session; + +class SenderImpl : public virtual qpid::RefCounted +{ + public: + virtual ~SenderImpl() {} + virtual void send(const Message& message, bool sync) = 0; + virtual void close() = 0; + virtual void setCapacity(uint32_t) = 0; + virtual uint32_t getCapacity() = 0; + virtual uint32_t getUnsettled() = 0; + virtual const std::string& getName() const = 0; + virtual Session getSession() const = 0; + virtual Address getAddress() const = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_SENDERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp new file mode 100644 index 0000000000..fd0519705d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Session.cpp @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace messaging { + +// Explicitly instantiate Handle superclass +template class Handle<SessionImpl>; + +typedef PrivateImplRef<qpid::messaging::Session> PI; + +Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); } +Session::Session(const Session& s) : Handle<SessionImpl>() { PI::copy(*this, s); } +Session::~Session() { PI::dtor(*this); } +Session& Session::operator=(const Session& s) { return PI::assign(*this, s); } +void Session::commit() { impl->commit(); } +void Session::rollback() { impl->rollback(); } +void Session::acknowledge(bool sync) { impl->acknowledge(sync); } +void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); } +void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); } +void Session::reject(Message& m) { impl->reject(m); } +void Session::release(Message& m) { impl->release(m); } +void Session::close() { impl->close(); } + +Sender Session::createSender(const Address& address) +{ + return impl->createSender(address); +} +Receiver Session::createReceiver(const Address& address) +{ + return impl->createReceiver(address); +} + +Sender Session::createSender(const std::string& address) +{ + return impl->createSender(Address(address)); +} +Receiver Session::createReceiver(const std::string& address) +{ + return impl->createReceiver(Address(address)); +} + +void Session::sync(bool block) +{ + impl->sync(block); +} + +bool Session::nextReceiver(Receiver& receiver, Duration timeout) +{ + return impl->nextReceiver(receiver, timeout); +} + + +Receiver Session::nextReceiver(Duration timeout) +{ + return impl->nextReceiver(timeout); +} + +uint32_t Session::getReceivable() { return impl->getReceivable(); } +uint32_t Session::getUnsettledAcks() { return impl->getUnsettledAcks(); } + +Sender Session::getSender(const std::string& name) const +{ + return impl->getSender(name); +} +Receiver Session::getReceiver(const std::string& name) const +{ + return impl->getReceiver(name); +} + +Connection Session::getConnection() const +{ + return impl->getConnection(); +} + +void Session::checkError() { impl->checkError(); } +bool Session::hasError() +{ + try { + checkError(); + return false; + } catch (const std::exception&) { + return true; + } +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h new file mode 100644 index 0000000000..60ae615253 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h @@ -0,0 +1,63 @@ +#ifndef QPID_MESSAGING_SESSIONIMPL_H +#define QPID_MESSAGING_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" +#include <string> +#include "qpid/messaging/Duration.h" + +namespace qpid { +namespace messaging { + +class Address; +class Connection; +class Message; +class Sender; +class Receiver; + +class SessionImpl : public virtual qpid::RefCounted +{ + public: + virtual ~SessionImpl() {} + virtual void commit() = 0; + virtual void rollback() = 0; + virtual void acknowledge(bool sync) = 0; + virtual void acknowledge(Message&, bool cumulative) = 0; + virtual void reject(Message&) = 0; + virtual void release(Message&) = 0; + virtual void close() = 0; + virtual void sync(bool block) = 0; + virtual Sender createSender(const Address& address) = 0; + virtual Receiver createReceiver(const Address& address) = 0; + virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0; + virtual Receiver nextReceiver(Duration timeout) = 0; + virtual uint32_t getReceivable() = 0; + virtual uint32_t getUnsettledAcks() = 0; + virtual Sender getSender(const std::string& name) const = 0; + virtual Receiver getReceiver(const std::string& name) const = 0; + virtual Connection getConnection() const = 0; + virtual void checkError() = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_SESSIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp new file mode 100644 index 0000000000..8033cc5dee --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -0,0 +1,781 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PnData.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" +#include <vector> +#include <set> +#include <sstream> +#include <boost/assign.hpp> +#include <boost/format.hpp> +extern "C" { +#include <proton/engine.h> +} + + +namespace qpid { +namespace messaging { +namespace amqp { + +using qpid::types::Variant; + +namespace { +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); + +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); + +const std::string NODE("node"); +const std::string LINK("link"); +const std::string CAPABILITIES("capabilities"); +const std::string PROPERTIES("properties"); +const std::string MODE("mode"); +const std::string BROWSE("browse"); +const std::string CONSUME("consume"); +const std::string TIMEOUT("timeout"); + +const std::string TYPE("type"); +const std::string TOPIC("topic"); +const std::string QUEUE("queue"); +const std::string DURABLE("durable"); +const std::string NAME("name"); +const std::string RELIABILITY("reliability"); +const std::string SELECTOR("selector"); +const std::string FILTER("filter"); +const std::string DESCRIPTOR("descriptor"); +const std::string VALUE("value"); +const std::string SUBJECT_FILTER("subject-filter"); +const std::string SOURCE("sender-source"); +const std::string TARGET("receiver-target"); + +//reliability options: +const std::string UNRELIABLE("unreliable"); +const std::string AT_MOST_ONCE("at-most-once"); +const std::string AT_LEAST_ONCE("at-least-once"); +const std::string EXACTLY_ONCE("exactly-once"); + +//distribution modes: +const std::string MOVE("move"); +const std::string COPY("copy"); + +const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +const std::string AUTO_DELETE("auto-delete"); +const std::string LIFETIME_POLICY("lifetime-policy"); +const std::string DELETE_ON_CLOSE("delete-on-close"); +const std::string DELETE_IF_UNUSED("delete-if-unused"); +const std::string DELETE_IF_EMPTY("delete-if-empty"); +const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty"); +const std::string CREATE_ON_DEMAND("create-on-demand"); + +const std::string X_DECLARE("x-declare"); +const std::string X_BINDINGS("x-bindings"); +const std::string X_SUBSCRIBE("x-subscribe"); +const std::string ARGUMENTS("arguments"); +const std::string EXCHANGE_TYPE("exchange-type"); + +const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); +const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); + +class Verifier +{ + public: + Verifier(); + void verify(const Address& address) const; + private: + Variant::Map defined; + void verify(const Variant::Map& allowed, const Variant::Map& actual) const; +}; +const Verifier verifier; + +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} + +std::string convert(pn_bytes_t in) +{ + return std::string(in.start, in.size); +} + +bool hasWildcards(const std::string& key) +{ + return key.find('*') != std::string::npos || key.find('#') != std::string::npos; +} + +uint64_t getFilterDescriptor(const std::string& key) +{ + return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; +} + +bool test(const Variant::Map& options, const std::string& name) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + return j->second; + } +} + +template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return defaultValue; + } else { + return j->second; + } +} + +bool bind(const Variant::Map& options, const std::string& name, std::string& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asString(); + return true; + } +} + +bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asMap(); + return true; + } +} + +bool bind(const Variant::Map& options, const std::string& name, Variant::List& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asList(); + return true; + } +} + +bool bind(const Address& address, const std::string& name, std::string& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool bind(const Address& address, const std::string& name, Variant::Map& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool in(const std::string& value, const std::vector<std::string>& choices) +{ + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value == *i) return true; + } + return false; +} +void add(Variant::Map& target, const Variant::Map& source) +{ + for (Variant::Map::const_iterator i = source.begin(); i != source.end(); ++i) { + target[i->first] = i->second; + } +} +void flatten(Variant::Map& base, const std::string& nested) +{ + Variant::Map::iterator i = base.find(nested); + if (i != base.end()) { + add(base, i->second.asMap()); + base.erase(i); + } +} +bool replace(Variant::Map& map, const std::string& original, const std::string& desired) +{ + Variant::Map::iterator i = map.find(original); + if (i != map.end()) { + map[desired] = i->second; + map.erase(original); + return true; + } else { + return false; + } +} + +const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes +const uint32_t DEFAULT_TIMEOUT(0); +} + +AddressHelper::AddressHelper(const Address& address) : + isTemporary(AddressImpl::isTemporary(address)), + name(address.getName()), + type(address.getType()), + durableNode(false), + durableLink(false), + timeout(0), + browse(false) +{ + verifier.verify(address); + bind(address, CREATE, createPolicy); + bind(address, DELETE, deletePolicy); + bind(address, ASSERT, assertPolicy); + + bind(address, NODE, node); + bind(address, LINK, link); + bind(node, PROPERTIES, properties); + bind(node, CAPABILITIES, capabilities); + bind(link, RELIABILITY, reliability); + durableNode = test(node, DURABLE); + durableLink = test(link, DURABLE); + timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); + std::string mode; + if (bind(address, MODE, mode)) { + if (mode == BROWSE) { + browse = true; + } else if (mode != CONSUME) { + throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'."); + } + } + + if (!deletePolicy.empty()) { + throw qpid::messaging::AddressError("Delete policies not supported over AMQP 1.0."); + } + if (node.find(X_BINDINGS) != node.end()) { + throw qpid::messaging::AddressError("Node scoped x-bindings element not supported over AMQP 1.0."); + } + if (link.find(X_BINDINGS) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-bindings element not supported over AMQP 1.0."); + } + if (link.find(X_SUBSCRIBE) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-subscribe element not supported over AMQP 1.0."); + } + if (link.find(X_DECLARE) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-declare element not supported over AMQP 1.0."); + } + //massage x-declare into properties + Variant::Map::iterator i = node.find(X_DECLARE); + if (i != node.end()) { + Variant::Map x_declare = i->second.asMap(); + replace(x_declare, TYPE, EXCHANGE_TYPE); + flatten(x_declare, ARGUMENTS); + add(properties, x_declare); + node.erase(i); + } + //for temp queues, if neither lifetime-policy nor autodelete are specified, assume delete-on-close + if (isTemporary && properties.find(LIFETIME_POLICY) == properties.end() && properties.find(AUTO_DELETE) == properties.end()) { + properties[LIFETIME_POLICY] = DELETE_ON_CLOSE; + } + + if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) { + QPID_LOG(warning, "Properties will be ignored! " << address); + } + + qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR); + if (selector != link.end()) { + addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second); + } + if (!address.getSubject().empty()) { + addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject()); + } + qpid::types::Variant::Map::const_iterator filter = link.find(FILTER); + if (filter != link.end()) { + if (filter->second.getType() == qpid::types::VAR_MAP) { + addFilter(filter->second.asMap()); + } else if (filter->second.getType() == qpid::types::VAR_LIST) { + addFilters(filter->second.asList()); + } else { + throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value."); + } + } +} + +void AddressHelper::addFilters(const qpid::types::Variant::List& f) +{ + for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) { + addFilter(i->asMap()); + } +} + +void AddressHelper::addFilter(const qpid::types::Variant::Map& f) +{ + qpid::types::Variant::Map::const_iterator name = f.find(NAME); + qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR); + qpid::types::Variant::Map::const_iterator value = f.find(VALUE); + //all fields are required at present (may relax this at a later stage): + if (name == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify name"); + } + if (descriptor == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify descriptor"); + } + if (value == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify value"); + } + try { + addFilter(name->second.asString(), descriptor->second.asUint64(), value->second); + } catch (const qpid::types::InvalidConversion&) { + addFilter(name->second.asString(), descriptor->second.asString(), value->second); + } + +} + +AddressHelper::Filter::Filter() : descriptorCode(0), confirmed(false) {} +AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v), confirmed(false) {} +AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v), confirmed(false) {} + +void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); +} +void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); +} + +namespace { +bool checkLifetimePolicy(const std::string& requested, const std::string& actual) +{ + if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true; + else return actual == requested; +} +bool checkLifetimePolicy(const std::string& requested, uint64_t actual) +{ + if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL); + else + return false; +} +bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual) +{ + bool result(false); + if (pn_data_is_described(actual)) { + pn_data_enter(actual); + pn_data_next(actual); + if (pn_data_type(actual) == PN_ULONG) { + result = checkLifetimePolicy(requested, pn_data_get_ulong(actual)); + } else if (pn_data_type(actual) == PN_SYMBOL) { + result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual))); + } + pn_data_exit(actual); + } + return result; +} +} +void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) +{ + if (assertEnabled(mode)) { + QPID_LOG(debug, "checking capabilities: " << capabilities); + //ensure all desired capabilities have been offered + std::set<std::string> desired; + for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + if (*i != CREATE_ON_DEMAND) desired.insert(i->asString()); + } + pn_data_t* data = pn_terminus_capabilities(terminus); + if (pn_data_next(data)) { + pn_type_t type = pn_data_type(data); + if (type == PN_ARRAY) { + pn_data_enter(data); + while (pn_data_next(data)) { + desired.erase(convert(pn_data_get_symbol(data))); + } + pn_data_exit(data); + } else if (type == PN_SYMBOL) { + desired.erase(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type)); + } + } + + if (desired.size()) { + std::stringstream missing; + missing << "Desired capabilities not met: "; + bool first(true); + for (std::set<std::string>::const_iterator i = desired.begin(); i != desired.end(); ++i) { + if (first) first = false; + else missing << ", "; + missing << *i; + } + throw qpid::messaging::AssertionFailed(missing.str()); + } + + //ensure all desired filters are in use + data = pn_terminus_filter(terminus); + if (pn_data_next(data)) { + size_t count = pn_data_get_map(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + //skip key: + if (!pn_data_next(data)) break; + //expecting described value: + if (pn_data_is_described(data)) { + pn_data_enter(data); + pn_data_next(data); + if (pn_data_type(data) == PN_ULONG) { + confirmFilter(pn_data_get_ulong(data)); + } else if (pn_data_type(data) == PN_SYMBOL) { + confirmFilter(convert(pn_data_get_symbol(data))); + } + pn_data_exit(data); + } + } + pn_data_exit(data); + } + std::stringstream missing; + missing << "Desired filters not in use: "; + bool first(true); + for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { + if (!i->confirmed) { + if (first) first = false; + else missing << ", "; + missing << i->name << "("; + if (i->descriptorSymbol.empty()) missing << "0x" << std::hex << i->descriptorCode; + else missing << i->descriptorSymbol; + missing << ")"; + } + } + if (!first) throw qpid::messaging::AssertionFailed(missing.str()); + + //assert on properties (Note: this violates the AMQP 1.0 + //specification - as does the create option - by sending + //node-properties even if the dynamic option is not + //set. However this can be avoided by not specifying any node + //properties when asserting) + if (!type.empty() || durableNode || !properties.empty()) { + bool isAutoDeleted = false; + qpid::types::Variant::Map requested = properties; + if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE; + if (durableNode) requested[DURABLE] = true; + + data = pn_terminus_properties(terminus); + if (pn_data_next(data)) { + size_t count = pn_data_get_map(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + std::string key = convert(pn_data_get_symbol(data)); + pn_data_next(data); + qpid::types::Variant::Map::const_iterator j = requested.find(key); + qpid::types::Variant v; + if (key == LIFETIME_POLICY) { + isAutoDeleted = true; + if (j != requested.end() && checkLifetimePolicy(j->second.asString(), data)) { + requested.erase(j->first); + } + } else if (key == AUTO_DELETE) { + PnData(data).get(v); + isAutoDeleted = v.asBool(); + } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) { + requested.erase(j->first); + } + } + pn_data_exit(data); + qpid::types::Variant::Map::iterator i = requested.find(AUTO_DELETE); + if (i != requested.end() && i->second.asBool() == isAutoDeleted) { + requested.erase(i); + } + if (!requested.empty()) { + std::stringstream missing; + missing << "Requested node properties not met: " << requested; + throw qpid::messaging::AssertionFailed(missing.str()); + } + } + } + } +} + +void AddressHelper::confirmFilter(const std::string& descriptor) +{ + for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { + if (descriptor == i->descriptorSymbol) i->confirmed = true; + } +} + +void AddressHelper::confirmFilter(uint64_t descriptor) +{ + for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { + if (descriptor == i->descriptorCode) i->confirmed = true; + } +} + +bool AddressHelper::createEnabled(CheckMode mode) const +{ + return enabled(createPolicy, mode); +} + +bool AddressHelper::assertEnabled(CheckMode mode) const +{ + return enabled(assertPolicy, mode); +} + +bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const +{ + bool result = false; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +bool AddressHelper::isUnreliable() const +{ + return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || + (reliability.empty() && browse); // A browser defaults to unreliable. +} + +const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const +{ + return node; +} +const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const +{ + return link; +} + +bool AddressHelper::getLinkSource(std::string& out) const +{ + return getLinkOption(SOURCE, out); +} + +bool AddressHelper::getLinkTarget(std::string& out) const +{ + return getLinkOption(TARGET, out); +} + +bool AddressHelper::getLinkOption(const std::string& name, std::string& out) const +{ + qpid::types::Variant::Map::const_iterator i = link.find(name); + if (i != link.end()) { + out = i->second.asString(); + return true; + } else { + return false; + } +} + +void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode) +{ + bool createOnDemand(false); + if (isTemporary) { + //application expects a name to be generated + pn_terminus_set_dynamic(terminus, true); + setNodeProperties(terminus); + } else { + pn_terminus_set_address(terminus, name.c_str()); + if (createEnabled(mode)) { + //application expects name of node to be as specified + setNodeProperties(terminus); + createOnDemand = true; + } else if (assertEnabled(mode)) { + setNodeProperties(terminus); + } + } + + setCapabilities(terminus, createOnDemand); + if (durableLink) { + pn_terminus_set_durability(terminus, PN_DELIVERIES); + } + if (mode == FOR_RECEIVER) { + if (timeout) pn_terminus_set_timeout(terminus, timeout); + if (browse) { + pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY); + } + //set filter(s): + if (!filters.empty()) { + pn_data_t* filter = pn_terminus_filter(terminus); + pn_data_put_map(filter); + pn_data_enter(filter); + for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { + pn_data_put_symbol(filter, convert(i->name)); + pn_data_put_described(filter); + pn_data_enter(filter); + if (i->descriptorSymbol.size()) { + pn_data_put_symbol(filter, convert(i->descriptorSymbol)); + } else { + pn_data_put_ulong(filter, i->descriptorCode); + } + PnData(filter).put(i->value); + pn_data_exit(filter); + } + pn_data_exit(filter); + } + } + if (isUnreliable()) { + pn_link_set_snd_settle_mode(link, PN_SND_SETTLED); + } else if (!reliability.empty()) { + if (reliability == EXACTLY_ONCE ) { + QPID_LOG(warning, "Unsupported reliability mode: " << reliability); + } else if (reliability != AT_LEAST_ONCE ) { + QPID_LOG(warning, "Unrecognised reliability mode: " << reliability); + } + pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED); + } +} + +void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) +{ + if (create) capabilities.push_back(CREATE_ON_DEMAND); + if (!type.empty()) capabilities.push_back(type); + if (durableNode) capabilities.push_back(DURABLE); + + pn_data_t* data = pn_terminus_capabilities(terminus); + if (capabilities.size() == 1) { + pn_data_put_symbol(data, convert(capabilities.front().asString())); + } else if (capabilities.size() > 1) { + pn_data_put_array(data, false, PN_SYMBOL); + pn_data_enter(data); + for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + pn_data_put_symbol(data, convert(i->asString())); + } + pn_data_exit(data); + } +} +std::string AddressHelper::getLinkName(const Address& address) +{ + AddressHelper helper(address); + const qpid::types::Variant::Map& linkProps = helper.getLinkProperties(); + qpid::types::Variant::Map::const_iterator i = linkProps.find(NAME); + if (i != linkProps.end()) { + return i->second.asString(); + } else { + std::stringstream name; + name << address.getName() << "_" << qpid::types::Uuid(true); + return name.str(); + } +} +namespace { +std::string toLifetimePolicy(const std::string& value) +{ + if (value == DELETE_ON_CLOSE) return qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL; + else if (value == DELETE_IF_UNUSED) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; + else if (value == DELETE_IF_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; + else if (value == DELETE_IF_UNUSED_AND_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; + else return value;//asume value is itself the symbolic descriptor +} +void putLifetimePolicy(pn_data_t* data, const std::string& value) +{ + pn_data_put_described(data); + pn_data_enter(data); + pn_data_put_symbol(data, convert(value)); + pn_data_put_list(data); + pn_data_exit(data); +} +} +void AddressHelper::setNodeProperties(pn_terminus_t* terminus) +{ + if (properties.size() || type.size() || durableNode) { + pn_data_t* data = pn_terminus_properties(terminus); + pn_data_put_map(data); + pn_data_enter(data); + if (type.size()) { + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE)); + } + if (durableNode) { + pn_data_put_symbol(data, convert(DURABLE)); + pn_data_put_bool(data, true); + } + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == LIFETIME_POLICY) { + pn_data_put_symbol(data, convert(i->first)); + putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); + } else { + pn_data_put_symbol(data, convert(i->first)); + PnData(data).put(i->second); + } + } + pn_data_exit(data); + } +} + +Verifier::Verifier() +{ + defined[CREATE] = true; + defined[ASSERT] = true; + defined[DELETE] = true; + defined[MODE] = true; + Variant::Map node; + node[TYPE] = true; + node[DURABLE] = true; + node[PROPERTIES] = true; + node[CAPABILITIES] = true; + node[X_DECLARE] = true; + node[X_BINDINGS] = true; + defined[NODE] = node; + Variant::Map link; + link[NAME] = true; + link[DURABLE] = true; + link[RELIABILITY] = true; + link[TIMEOUT] = true; + link[SOURCE] = true; + link[TARGET] = true; + link[X_SUBSCRIBE] = true; + link[X_DECLARE] = true; + link[X_BINDINGS] = true; + link[SELECTOR] = true; + link[FILTER] = true; + defined[LINK] = link; +} +void Verifier::verify(const Address& address) const +{ + verify(defined, address.getOptions()); +} + +void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const +{ + for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) { + Variant::Map::const_iterator option = allowed.find(i->first); + if (option == allowed.end()) { + throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str()); + } else if (option->second.getType() == qpid::types::VAR_MAP) { + verify(option->second.asMap(), i->second.asMap()); + } + } +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h new file mode 100644 index 0000000000..3ee58cad8d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -0,0 +1,95 @@ +#ifndef QPID_MESSAGING_AMQP_ADDRESSHELPER_H +#define QPID_MESSAGING_AMQP_ADDRESSHELPER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" +#include <vector> + +struct pn_link_t; +struct pn_terminus_t; + +namespace qpid { +namespace messaging { +class Address; +namespace amqp { +class AddressHelper +{ + public: + enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + AddressHelper(const Address& address); + void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode); + void checkAssertion(pn_terminus_t* terminus, CheckMode mode); + + bool isUnreliable() const; + const qpid::types::Variant::Map& getNodeProperties() const; + bool getLinkSource(std::string& out) const; + bool getLinkTarget(std::string& out) const; + const qpid::types::Variant::Map& getLinkProperties() const; + static std::string getLinkName(const Address& address); + private: + struct Filter + { + std::string name; + std::string descriptorSymbol; + uint64_t descriptorCode; + qpid::types::Variant value; + bool confirmed; + + Filter(); + Filter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value); + Filter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value); + }; + + bool isTemporary; + std::string createPolicy; + std::string assertPolicy; + std::string deletePolicy; + qpid::types::Variant::Map node; + qpid::types::Variant::Map link; + qpid::types::Variant::Map properties; + qpid::types::Variant::List capabilities; + std::string name; + std::string type; + std::string reliability; + bool durableNode; + bool durableLink; + uint32_t timeout; + bool browse; + std::vector<Filter> filters; + + bool enabled(const std::string& policy, CheckMode mode) const; + bool createEnabled(CheckMode mode) const; + bool assertEnabled(CheckMode mode) const; + void setCapabilities(pn_terminus_t* terminus, bool create); + void setNodeProperties(pn_terminus_t* terminus); + void addFilter(const qpid::types::Variant::Map&); + void addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value); + void addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value); + void addFilters(const qpid::types::Variant::List&); + void confirmFilter(const std::string& descriptor); + void confirmFilter(uint64_t descriptor); + bool getLinkOption(const std::string& name, std::string& out) const; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_ADDRESSHELPER_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp new file mode 100644 index 0000000000..1b8c848941 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -0,0 +1,1317 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "DriverImpl.h" +#include "PnData.h" +#include "ReceiverContext.h" +#include "Sasl.h" +#include "SenderContext.h" +#include "SessionContext.h" +#include "Transaction.h" +#include "Transport.h" +#include "util.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" +#include "qpid/sys/urlAdd.h" +#include "config.h" +#include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> +#include <vector> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { +using types::Variant; + +namespace { + +void do_trace(pn_transport_t* transport, const char* message) +{ + ConnectionContext* c = reinterpret_cast<ConnectionContext*>(pn_transport_get_context(transport)); + if (c) c->trace(message); +} + +void set_tracer(pn_transport_t* transport, void* context) +{ + pn_transport_set_context(transport, context); + pn_transport_set_tracer(transport, &do_trace); +} + +#ifdef USE_PROTON_TRANSPORT_CONDITION +std::string get_error(pn_connection_t* connection, pn_transport_t* transport) +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; + pn_condition_t* tcondition = pn_transport_condition(transport); + if (pn_condition_is_set(tcondition)) text << get_error_string(tcondition, "transport error", ": "); + return text.str(); +} +#else +std::string get_error(pn_connection_t* connection, pn_transport_t* transport) +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; + pn_error_t* terror = pn_transport_error(transport); + if (terror) text << "transport error " << pn_error_text(terror) << " [" << terror << "]"; + return text.str(); +} +#endif + +class ConnectionTickerTask : public qpid::sys::TimerTask +{ + qpid::sys::Timer& timer; + ConnectionContext& connection; + public: + ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) : + TimerTask(interval, "ConnectionTicker"), + timer(t), + connection(c) + {} + + void fire() { + QPID_LOG(debug, "ConnectionTickerTask fired"); + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Ticker + connection.activateOutput(); + } +}; +} + +void ConnectionContext::trace(const char* message) const +{ + QPID_LOG_CAT(trace, protocol, "[" << identifier << "]: " << message); +} + +ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o) + : qpid::messaging::ConnectionOptions(o), + fullUrl(url, protocol.empty() ? qpid::Address::TCP : protocol), + engine(pn_transport()), + connection(pn_connection()), + //note: disabled read/write of header as now handled by engine + writeHeader(false), + readHeader(false), + haveOutput(false), + state(DISCONNECTED), + codecAdapter(*this), + notifyOnWrite(false) +{ + // Concatenate all known URLs into a single URL, get rid of duplicate addresses. + sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? + qpid::Address::TCP : protocol); + if (identifier.empty()) { + identifier = qpid::types::Uuid(true).str(); + } + configureConnection(); +} + +ConnectionContext::~ConnectionContext() +{ + if (ticker) ticker->cancel(); + close(); + sessions.clear(); + pn_connection_free(connection); + pn_transport_free(engine); +} + +bool ConnectionContext::isOpen() const +{ + sys::Monitor::ScopedLock l(lock); + return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); +} + +void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) +{ + sys::Monitor::ScopedLock l(lock); + syncLH(ssn, l); +} + +void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) { + while (!ssn->settled()) { + QPID_LOG(debug, "Waiting for sends to settle on sync()"); + wait(ssn);//wait until message has been confirmed + wakeupDriver(); + } + checkClosed(ssn); +} + +void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) +{ + sys::Monitor::ScopedLock l(lock); + if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { + //explicitly release messages that have yet to be fetched + for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { + drain_and_release_messages(ssn, i->second); + } + syncLH(ssn, l); + } + + if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { + pn_session_close(ssn->session); + } + sessions.erase(ssn->getName()); + + wakeupDriver(); +} + +void ConnectionContext::close() +{ + sys::Monitor::ScopedLock l(lock); + if (state != CONNECTED) return; + if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + syncLH(i->second, l); + if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { + pn_session_close(i->second->session); + } + } + pn_connection_close(connection); + wakeupDriver(); + //wait for close to be confirmed by peer? + while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { + if (state == DISCONNECTED) { + QPID_LOG(warning, "Disconnected before close received from peer."); + break; + } + lock.wait(); + } + sessions.clear(); + } + if (state != DISCONNECTED) { + transport->close(); + while (state != DISCONNECTED) { + lock.wait(); + } + } + if (ticker) { + ticker->cancel(); + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(); + } +} + +bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + /** + * For fetch() on a receiver with zero capacity, need to reissue the + * credit on reconnect, so track the fetches in progress. + */ + qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); + { + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn, lnk); + if (!lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + } + if (get(ssn, lnk, message, timeout)) { + return true; + } else { + { + sys::Monitor::ScopedLock l(lock); + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + wait(ssn, lnk); + } + if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { + pn_link_flow(lnk->receiver, lnk->capacity); + } + } + if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { + return true; + } else { + return false; + } + } +} + +qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until; + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC); + } else { + return qpid::sys::FAR_FUTURE; + } +} + +bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn, lnk); + pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); + QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); + if (current) { + qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message); + boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current))); + encoded->setNestAnnotationsOption(nestAnnotations); + ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize()); + if (read < 0) throw qpid::messaging::MessagingException("Failed to read message"); + encoded->trim((size_t) read); + QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); + encoded->init(impl); + impl.setEncoded(encoded); + impl.setInternalId(ssn->record(current)); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + if (lnk->wakeupToIssueCredit()) { + wakeupDriver(); + } else { + haveOutput = true; + } + } + // Automatically ack messages if we are in a transaction. + if (ssn->transaction) + acknowledgeLH(ssn, &message, false, l); + return true; + } else if (until > qpid::sys::now()) { + waitUntil(ssn, lnk, until); + } else { + return false; + } + } + return false; +} + +boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn); + boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); + if (r) { + return r; + } else if (until > qpid::sys::now()) { + waitUntil(ssn, until); + } else { + return boost::shared_ptr<ReceiverContext>(); + } + } +} + +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { + sys::Monitor::ScopedLock l(lock); + acknowledgeLH(ssn, message, cumulative, l); +} + +void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&) +{ + checkClosed(ssn); + if (message) { + ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); + } else { + ssn->acknowledge(); + } + wakeupDriver(); +} + +void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) +{ + sys::Monitor::ScopedLock l(lock); + checkClosed(ssn); + ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); + wakeupDriver(); +} + +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + sys::Monitor::ScopedLock l(lock); + if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { + lnk->close(); + } + wakeupDriver(); + while (pn_link_state(lnk->sender) & PN_REMOTE_ACTIVE) { + wait(ssn); + } + ssn->removeSender(lnk->getName()); +} + +void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + //Not all implementations handle drain correctly, so limit the + //time spent waiting for it + qpid::sys::AbsTime until(qpid::sys::now(), qpid::sys::TIME_SEC*2); + while (pn_link_credit(lnk->receiver) > pn_link_queued(lnk->receiver) && until > qpid::sys::now()) { + QPID_LOG(debug, "Waiting for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + waitUntil(ssn, lnk, until); + } + //release as yet unfetched messages: + for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) { + pn_link_advance(lnk->receiver); + pn_delivery_update(d, PN_RELEASED); + pn_delivery_settle(d); + } +} + +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + sys::Monitor::ScopedLock l(lock); + drain_and_release_messages(ssn, lnk); + if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { + lnk->close(); + } + wakeupDriver(); + while (pn_link_state(lnk->receiver) & PN_REMOTE_ACTIVE) { + wait(ssn); + } + ssn->removeReceiver(lnk->getName()); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + lnk->configure(); + attach(ssn, lnk->sender); + checkClosed(ssn, lnk); + lnk->verify(); + QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + lnk->configure(); + attach(ssn, lnk->receiver, lnk->capacity); + checkClosed(ssn, lnk); + lnk->verify(); + QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* link, int credit) +{ + pn_link_open(link); + QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link)); + if (credit) pn_link_flow(link, credit); + wakeupDriver(); + while (pn_link_state(link) & PN_REMOTE_UNINIT) { + QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "..."); + wait(ssn); + } +} + +boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) +{ + sys::Monitor::ScopedLock l(lock); + boost::shared_ptr<SenderContext> sender = session->createSender(address, setToOnSend); + try { + attach(session, sender); + return sender; + } catch (...) { + session->removeSender(sender->getName()); + throw; + } + +} +boost::shared_ptr<ReceiverContext> ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) +{ + sys::Monitor::ScopedLock l(lock); + boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); + try { + attach(session, receiver); + return receiver; + } catch (...) { + session->removeReceiver(receiver->getName()); + throw; + } +} +boost::shared_ptr<SenderContext> ConnectionContext::getSender(boost::shared_ptr<SessionContext> session, const std::string& name) const +{ + sys::Monitor::ScopedLock l(lock); + return session->getSender(name); +} + +boost::shared_ptr<ReceiverContext> ConnectionContext::getReceiver(boost::shared_ptr<SessionContext> session, const std::string& name) const +{ + sys::Monitor::ScopedLock l(lock); + return session->getReceiver(name); +} + +void ConnectionContext::send( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery) +{ + sys::Monitor::ScopedLock l(lock); + sendLH(ssn, snd, message, sync, delivery, l); +} + +void ConnectionContext::sendLH( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery, + sys::Monitor::ScopedLock&) +{ + checkClosed(ssn); + while (pn_transport_pending(engine) > 65536) { + QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); + notifyOnWrite = true; + wakeupDriver(); + wait(ssn, snd); + notifyOnWrite = false; + } + while (!snd->send(message, delivery)) { + QPID_LOG(debug, "Waiting for capacity..."); + wait(ssn, snd);//wait for capacity + } + wakeupDriver(); + if (sync && *delivery) { + while (!(*delivery)->delivered()) { + QPID_LOG(debug, "Waiting for confirmation..."); + wait(ssn, snd);//wait until message has been confirmed + } + if ((*delivery)->rejected()) { + throw MessageRejected("Message was rejected by peer"); + } + + } +} + +void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) +{ + sys::Monitor::ScopedLock l(lock); + sender->setCapacity(capacity); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) +{ + sys::Monitor::ScopedLock l(lock); + return sender->getCapacity(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) +{ + sys::Monitor::ScopedLock l(lock); + return sender->getUnsettled(); +} + +void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) +{ + sys::Monitor::ScopedLock l(lock); + receiver->setCapacity(capacity); + pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); + wakeupDriver(); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) +{ + sys::Monitor::ScopedLock l(lock); + return receiver->getCapacity(); +} +uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) +{ + sys::Monitor::ScopedLock l(lock); + return receiver->getAvailable(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) +{ + sys::Monitor::ScopedLock l(lock); + return receiver->getUnsettled(); +} + +void ConnectionContext::activateOutput() +{ + sys::Monitor::ScopedLock l(lock); + if (state == CONNECTED) wakeupDriver(); +} +/** + * Expects lock to be held by caller + */ +void ConnectionContext::wakeupDriver() +{ + switch (state) { + case CONNECTED: + haveOutput = true; + transport->activateOutput(); + QPID_LOG(debug, "wakeupDriver()"); + break; + case DISCONNECTED: + case CONNECTING: + QPID_LOG(error, "wakeupDriver() called while not connected"); + break; + } +} + +namespace { +pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; +pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; +} + +void ConnectionContext::reset() +{ + pn_connection_free(connection); + pn_transport_free(engine); + + engine = pn_transport(); + connection = pn_connection(); + configureConnection(); + + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + i->second->reset(connection); + } +} + +bool ConnectionContext::check() { + if (checkDisconnected()) { + if (ConnectionOptions::reconnect) { + QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); + autoconnect(); + QPID_LOG(notice, "Auto-reconnected to " << currentUrl); + } else { + throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); + } + return true; + } + return false; +} + +bool ConnectionContext::checkDisconnected() { + if (state == DISCONNECTED) { + reset(); + } else { + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + std::string text = get_error_string(pn_connection_remote_condition(connection), "Connection closed by peer"); + pn_connection_close(connection); + throw qpid::messaging::ConnectionError(text); + } + } + return state == DISCONNECTED; +} + +void ConnectionContext::wait() +{ + if (check()) return; // Reconnected, may need to re-test condition. + lock.wait(); + check(); +} +void ConnectionContext::waitUntil(qpid::sys::AbsTime until) +{ + lock.wait(until); + check(); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn) +{ + wait(); + checkClosed(ssn); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, qpid::sys::AbsTime until) +{ + waitUntil(until); + checkClosed(ssn); +} +void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::sys::AbsTime until) +{ + waitUntil(until); + checkClosed(ssn, lnk); +} +void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk, qpid::sys::AbsTime until) +{ + waitUntil(until); + checkClosed(ssn, lnk); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) +{ + check(); + ssn->error.raise(); + if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + std::string text = get_error_string(pn_session_remote_condition(ssn->session), "Session ended by peer"); + pn_session_close(ssn->session); + throw qpid::messaging::SessionError(text); + } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::SessionClosed(); + } +} + +bool ConnectionContext::isClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + try { + checkClosed(ssn, lnk->receiver); + return false; + } catch (const LinkError&) { + return true; + } +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + checkClosed(ssn, lnk->receiver); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + checkClosed(ssn, lnk->sender); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk) +{ + checkClosed(ssn); + if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_condition_t* error = pn_link_remote_condition(lnk); + std::string text = get_error_string(error, "Link detached by peer"); + pn_link_close(lnk); + std::string name = pn_condition_get_name(error); + if (name == qpid::amqp::error_conditions::NOT_FOUND) { + throw qpid::messaging::NotFound(text); + } else if (name == qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS) { + throw qpid::messaging::UnauthorizedAccess(text); + } else { + throw qpid::messaging::LinkError(text); + } + } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::LinkError("Link is not attached"); + } +} + +void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) +{ + if (s->error) return; + pn_session_open(s->session); + wakeupDriver(); + while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { + wait(); + } + + for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) { + QPID_LOG(debug, id << " reattaching sender " << i->first); + attach(s, i->second->sender); + i->second->verify(); + QPID_LOG(debug, id << " sender " << i->first << " reattached"); + i->second->resend(); + } + for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) { + QPID_LOG(debug, id << " reattaching receiver " << i->first); + if (i->second->capacity) { + attach(s, i->second->receiver, i->second->capacity); + } else { + attach(s, i->second->receiver, (uint32_t) i->second->fetching); + } + i->second->verify(); + QPID_LOG(debug, id << " receiver " << i->first << " reattached"); + } + wakeupDriver(); +} + +boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) +{ + boost::shared_ptr<SessionContext> session; + std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; + { + sys::Monitor::ScopedLock l(lock); + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + session = boost::shared_ptr<SessionContext>(new SessionContext(connection)); + session->setName(name); + pn_session_open(session->session); + wakeupDriver(); + sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait() + while (pn_session_state(session->session) & PN_REMOTE_UNINIT) { + wait(); + } + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); + } + + } + if (transactional) { // Outside of lock + startTxSession(session); + } + return session; +} + +boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const +{ + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError(std::string("No such session") + name); + } else { + return i->second; + } +} + +void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value) +{ + set(name, value); +} + +std::string ConnectionContext::getAuthenticatedUsername() +{ + return sasl.get() ? sasl->getAuthenticatedUsername() : std::string(); +} + +std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + QPID_LOG(trace, id << " decode(" << size << ")"); + if (readHeader) { + size_t decoded = readProtocolHeader(buffer, size); + if (decoded < size) { + decoded += decode(buffer + decoded, size - decoded); + } + return decoded; + } + + //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); + if (n > 0 || n == PN_EOS) { + // PN_EOS either means we received a Close (which also means we've + // consumed all the input), OR some Very Bad Thing happened and this + // connection is toast. + if (n == PN_EOS) + { + std::string error; + if (checkTransportError(error)) { + // "He's dead, Jim." + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + return 0; + } else { + n = size; // assume all consumed + } + } + QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); + lock.notifyAll(); + return n; + } else if (n == PN_ERR) { + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + transport->abort(); + return 0; + } else { + return 0; + } + +} +std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + QPID_LOG(trace, id << " encode(" << size << ")"); + if (writeHeader) { + size_t encoded = writeProtocolHeader(buffer, size); + if (encoded < size) { + encoded += encode(buffer + encoded, size - encoded); + } + return encoded; + } + + ssize_t n = pn_transport_output(engine, buffer, size); + if (n > 0) { + QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) + haveOutput = true; + if (notifyOnWrite) lock.notifyAll(); + return n; + } else if (n == PN_ERR) { + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + transport->abort(); + return 0; + } else if (n == PN_EOS) { + haveOutput = false; + // Normal close, or error? + std::string error; + if (checkTransportError(error)) { + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + } + return 0; + } else { + haveOutput = false; + return 0; + } +} +bool ConnectionContext::canEncodePlain() +{ + sys::Monitor::ScopedLock l(lock); + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); + return haveOutput && state == CONNECTED; +} +void ConnectionContext::closed() +{ + sys::Monitor::ScopedLock l(lock); + state = DISCONNECTED; + lock.notifyAll(); +} +void ConnectionContext::opened() +{ + sys::Monitor::ScopedLock l(lock); + state = CONNECTED; + lock.notifyAll(); +} +bool ConnectionContext::isClosed() const +{ + return !isOpen(); +} +namespace { +qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP); +} + +std::string ConnectionContext::getError() +{ + return get_error(connection, engine); +} + +framing::ProtocolVersion ConnectionContext::getVersion() const +{ + return AMQP_1_0_PLAIN; +} + +std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + readHeader = false; + qpid::framing::Buffer out(const_cast<char*>(buffer), size); + pi.decode(out); + QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); + return pi.encodedSize(); + } else { + return 0; + } +} +std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); + writeHeader = false; + qpid::framing::Buffer out(buffer, size); + pi.encode(out); + return pi.encodedSize(); + } else { + QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size) + return 0; + } +} +bool ConnectionContext::useSasl() +{ + return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None"); +} + +qpid::sys::Codec& ConnectionContext::getCodec() +{ + return *this; +} + +const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions() +{ + return this; +} + +std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + size_t decoded = 0; + try { + if (sasl.get() && !sasl->authenticated()) { + decoded = sasl->decode(buffer, size); + if (!sasl->authenticated()) return decoded; + } + if (decoded < size) { + if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); + else decoded += decodePlain(buffer+decoded, size-decoded); + } + } catch (const AuthenticationFailure&) { + transport->close(); + } + return decoded; +} +std::size_t ConnectionContext::encode(char* buffer, std::size_t size) +{ + sys::Monitor::ScopedLock l(lock); + size_t encoded = 0; + try { + if (sasl.get() && sasl->canEncode()) { + encoded += sasl->encode(buffer, size); + if (!sasl->authenticated()) return encoded; + } + if (encoded < size) { + if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); + else encoded += encodePlain(buffer+encoded, size-encoded); + } + } catch (const AuthenticationFailure&) { + transport->close(); + } + return encoded; +} +bool ConnectionContext::canEncode() +{ + sys::Monitor::ScopedLock l(lock); + if (sasl.get()) { + try { + if (sasl->canEncode()) return true; + else if (!sasl->authenticated()) return false; + else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode(); + } catch (const AuthenticationFailure&) { + transport->close(); + return false; + } + } + return canEncodePlain(); +} + +namespace { +const std::string CLIENT_PROCESS_NAME("qpid.client_process"); +const std::string CLIENT_PID("qpid.client_pid"); +const std::string CLIENT_PPID("qpid.client_ppid"); +} +void ConnectionContext::setProperties() +{ + PnData data(pn_connection_properties(connection)); + pn_data_put_map(data.data); + pn_data_enter(data.data); + data.putSymbol(CLIENT_PROCESS_NAME); + data.putSymbol(sys::SystemInfo::getProcessName()); + data.putSymbol(CLIENT_PID); + data.put(int32_t(sys::SystemInfo::getProcessId())); + data.putSymbol(CLIENT_PPID); + data.put(int32_t(sys::SystemInfo::getParentProcessId())); + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) + { + data.putSymbol(i->first); + data.put(i->second); + } + pn_data_exit(data.data); +} + +const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() +{ + return transport ? transport->getSecuritySettings() : 0; +} + +void ConnectionContext::open() +{ + sys::Monitor::ScopedLock l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + QPID_LOG(info, "Starting connection to " << fullUrl); + autoconnect(); +} + + +namespace { +double FOREVER(std::numeric_limits<double>::max()); +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + qpid::sys::Duration used(start, qpid::sys::now()); + qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC)); + return allowed < used; +} +const std::string COLON(":"); +} + +void throwConnectFail(const Url& url, const std::string& msg) { + throw qpid::messaging::TransportFailure( + Msg() << "Connect failed to " << url << ": " << msg); +} + +void ConnectionContext::autoconnect() +{ + qpid::sys::AbsTime started(qpid::sys::now()); + for (double i = minReconnectInterval; !tryConnectUrl(fullUrl); i = std::min(i*2, maxReconnectInterval)) { + if (!ConnectionOptions::reconnect) throwConnectFail(fullUrl, "Reconnect disabled"); + if (limit >= 0 && retries++ >= limit) throwConnectFail(fullUrl, "Exceeded retries"); + if (expired(started, timeout)) throwConnectFail(fullUrl, "Exceeded timeout"); + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds to" + << fullUrl); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. + } + retries = 0; +} + +void ConnectionContext::reconnect(const Url& url) { + QPID_LOG(notice, "Reconnecting to " << url); + sys::Monitor::ScopedLock l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + reset(); + if (!tryConnectUrl(url)) throwConnectFail(url, "Failed to reconnect"); + QPID_LOG(notice, "Reconnected to " << currentUrl); +} + +void ConnectionContext::reconnect(const std::string& url) { reconnect(Url(url)); } + +void ConnectionContext::reconnect() { reconnect(fullUrl); } + +void ConnectionContext::waitNoReconnect() { + if (!checkDisconnected()) { + lock.wait(); + checkDisconnected(); + } +} + +// Try to connect to a URL, i.e. try to connect to each of its addresses in turn +// till one succeeds or they all fail. +// @return true if we connect successfully +bool ConnectionContext::tryConnectUrl(const Url& url) +{ + if (url.getUser().size()) username = url.getUser(); + if (url.getPass().size()) password = url.getPass(); + + for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { + QPID_LOG(info, "Connecting to " << *i); + if (tryConnectAddr(*i) && tryOpenAddr(*i)) { + QPID_LOG(info, "Connected to " << *i); + return true; + } + } + return false; +} + +// Try to open an AMQP protocol connection on an address, after we have already +// established a transport connect (see tryConnectAddr below) +// @return true if the AMQP connection is succesfully opened. +bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { + currentUrl = Url(addr); + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated() && state != DISCONNECTED) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + waitNoReconnect(); + } + if (state == DISCONNECTED) return false; + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + pn_connection_open(connection); + wakeupDriver(); //want to write + while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && + state != DISCONNECTED) + waitNoReconnect(); + if (state == DISCONNECTED) return false; + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + + // Connection open - check for idle timeout from the remote and start a + // periodic tick to monitor for idle connections + pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine); + pn_timestamp_t local = pn_transport_get_idle_timeout(engine); + uint64_t shortest = ((remote && local) + ? std::min(remote, local) + : (remote) ? remote : local); + if (shortest) { + // send an idle frame at least twice before timeout + shortest = (shortest + 1)/2; + qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC); + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this)); + driver->getTimer().add(ticker); + QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:" + << " local=" << pn_transport_get_idle_timeout(engine) + << " remote=" << pn_transport_get_remote_idle_timeout(engine)); + } + + QPID_LOG(debug, id << " Opened"); + + return restartSessions(); +} + +std::string ConnectionContext::getUrl() const +{ + sys::Monitor::ScopedLock l(lock); + return (state == CONNECTED) ? currentUrl.str() : std::string(); +} + +// Try to establish a transport connect to an individual address (typically a +// TCP host:port) +// @return true if we succeed in connecting. +bool ConnectionContext::tryConnectAddr(const qpid::Address& address) +{ + transport = driver->getTransport(address.protocol, *this); + id = boost::lexical_cast<std::string>(address); + if (useSasl()) { + sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host)); + } + state = CONNECTING; + try { + QPID_LOG(debug, id << " Connecting ..."); + transport->connect(address.host, boost::lexical_cast<std::string>(address.port)); + bool waiting(true); + while (waiting) { + switch (state) { + case CONNECTED: + QPID_LOG(debug, id << " Connected"); + return true; + case CONNECTING: + lock.wait(); + break; + case DISCONNECTED: + waiting = false; + break; + } + } + } catch (const std::exception& e) { + QPID_LOG(info, id << " Error while connecting: " << e.what()); + state = DISCONNECTED; + } + transport = boost::shared_ptr<Transport>(); + return false; +} + +bool ConnectionContext::restartSessions() +{ + try { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + restartSession(i->second); + } + return true; + } catch (const qpid::TransportFailure& e) { + QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what()); + return false; + } +} + +void ConnectionContext::initSecurityLayer(qpid::sys::SecurityLayer& s) +{ + s.init(&codecAdapter); +} + +ConnectionContext::CodecAdapter::CodecAdapter(ConnectionContext& c) : context(c) {} +std::size_t ConnectionContext::CodecAdapter::decode(const char* buffer, std::size_t size) +{ + return context.decodePlain(buffer, size); +} +std::size_t ConnectionContext::CodecAdapter::encode(char* buffer, std::size_t size) +{ + return context.encodePlain(buffer, size); +} +bool ConnectionContext::CodecAdapter::canEncode() +{ + return context.canEncodePlain(); +} + +void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) { + try { + QPID_LOG(debug, id << " attaching transaction for " << session->getName()); + boost::shared_ptr<Transaction> tx(new Transaction(session->session)); + session->transaction = tx; + { + sys::Monitor::ScopedLock l(lock); + attach(session, boost::shared_ptr<SenderContext>(tx)); + } + tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session); + } catch (const Exception& e) { + throw TransactionError(Msg() << "Cannot start transaction: " << e.what()); + } +} + +void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) { + { + sys::Monitor::ScopedLock l(lock); + checkClosed(session); + if (!session->transaction) + throw TransactionError("No Transaction"); + Transaction::SendFunction sendFn = boost::bind( + &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l)); + syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received. + session->transaction->discharge(sendFn, session, fail); + session->transaction->declare(sendFn, session); + } +} + +void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) { + discharge(session, false); +} + +void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) { + discharge(session, true); +} + + +// setup the transport and connection objects: +void ConnectionContext::configureConnection() +{ + pn_connection_set_container(connection, identifier.c_str()); + setProperties(); + if (heartbeat) { + // fail an idle connection at 2 x heartbeat (in msecs) + pn_transport_set_idle_timeout(engine, heartbeat*2*1000); + } + + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) { + pn_transport_trace(engine, PN_TRACE_FRM); + set_tracer(engine, this); + } + + int err = pn_transport_bind(engine, connection); + if (err) + QPID_LOG(error, id << " Error binding connection and transport: " << err); +} + + +// check for failures of the transport: +bool ConnectionContext::checkTransportError(std::string& text) +{ + std::stringstream info; + +#ifdef USE_PROTON_TRANSPORT_CONDITION + pn_condition_t* tcondition = pn_transport_condition(engine); + if (pn_condition_is_set(tcondition)) + info << get_error_string(tcondition, "transport error", ": "); +#else + pn_error_t* terror = pn_transport_error(engine); + if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; +#endif + + text = info.str(); + return !text.empty(); +} +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h new file mode 100644 index 0000000000..ba3220c0ab --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -0,0 +1,233 @@ +#ifndef QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H +#define QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <deque> +#include <map> +#include <memory> +#include <string> +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> +#include "qpid/Url.h" +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Monitor.h" +#include "qpid/types/Variant.h" +#include "qpid/messaging/amqp/TransportContext.h" +#include "SenderContext.h" + +struct pn_connection_t; +struct pn_link_t; +struct pn_session_t; +struct pn_transport_t; + + +namespace qpid { +namespace framing { +class ProtocolVersion; +} +namespace sys { +class SecurityLayer; +struct SecuritySettings; +class TimerTask; +} +namespace messaging { +class Duration; +class Message; +namespace amqp { + +class DriverImpl; +class ReceiverContext; +class Sasl; +class SessionContext; +class Transport; + +/** + * + */ +class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messaging::ConnectionOptions, public TransportContext +{ + public: + ConnectionContext(const std::string& url, const qpid::types::Variant::Map& options); + ~ConnectionContext(); + void open(); + bool isOpen() const; + void close(); + boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name); + boost::shared_ptr<SessionContext> getSession(const std::string& name) const; + void endSession(boost::shared_ptr<SessionContext>); + boost::shared_ptr<SenderContext> createSender(boost::shared_ptr<SessionContext>, const qpid::messaging::Address& address); + boost::shared_ptr<ReceiverContext> createReceiver(boost::shared_ptr<SessionContext>, const qpid::messaging::Address& address); + boost::shared_ptr<SenderContext> getSender(boost::shared_ptr<SessionContext>, const std::string& name) const; + boost::shared_ptr<ReceiverContext> getReceiver(boost::shared_ptr<SessionContext>, const std::string& name) const; + + void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + + // Link operations + void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery); + + bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + + // Session operations + void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + void commit(boost::shared_ptr<SessionContext> ssn); + void rollback(boost::shared_ptr<SessionContext> ssn); + + void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject); + void sync(boost::shared_ptr<SessionContext> ssn); + boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout); + + void setOption(const std::string& name, const qpid::types::Variant& value); + std::string getAuthenticatedUsername(); + + // Link operations + void setCapacity(boost::shared_ptr<SenderContext>, uint32_t); + uint32_t getCapacity(boost::shared_ptr<SenderContext>); + uint32_t getUnsettled(boost::shared_ptr<SenderContext>); + void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t); + uint32_t getCapacity(boost::shared_ptr<ReceiverContext>); + uint32_t getAvailable(boost::shared_ptr<ReceiverContext>); + uint32_t getUnsettled(boost::shared_ptr<ReceiverContext>); + + + void activateOutput(); + qpid::sys::Codec& getCodec(); + const qpid::messaging::ConnectionOptions* getOptions(); + //ConnectionCodec interface: + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + void closed(); + bool isClosed() const; + framing::ProtocolVersion getVersion() const; + //additionally, Transport needs: + void opened();//signal successful connection + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; + const qpid::sys::SecuritySettings* getTransportSecuritySettings(); + void initSecurityLayer(qpid::sys::SecurityLayer&); + void trace(const char*) const; + + private: + typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap; + class CodecAdapter : public qpid::sys::Codec + { + public: + CodecAdapter(ConnectionContext&); + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + private: + ConnectionContext& context; + }; + + Url fullUrl; // Combined URL of all known addresses. + Url currentUrl; // URL of currently connected address. + + boost::shared_ptr<DriverImpl> driver; + boost::shared_ptr<Transport> transport; + + pn_transport_t* engine; + pn_connection_t* connection; + SessionMap sessions; + mutable qpid::sys::Monitor lock; + bool writeHeader; + bool readHeader; + bool haveOutput; + std::string id; + enum { + DISCONNECTED, + CONNECTING, + CONNECTED + } state; + std::auto_ptr<Sasl> sasl; + CodecAdapter codecAdapter; + bool notifyOnWrite; + boost::intrusive_ptr<qpid::sys::TimerTask> ticker; + + bool check(); + bool checkDisconnected(); + void waitNoReconnect(); + + // NOTE: All wait*() functions must be called in a loop that checks for the + // waited condition with the lock held. + void wait(); + void waitUntil(qpid::sys::AbsTime until); + void wait(boost::shared_ptr<SessionContext>); + void waitUntil(boost::shared_ptr<SessionContext>, qpid::sys::AbsTime until); + void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until); + void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until); + + void checkClosed(boost::shared_ptr<SessionContext>); + void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); + + void wakeupDriver(); + void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void autoconnect(); + bool tryConnectUrl(const qpid::Url& url); + bool tryOpenAddr(const qpid::Address& address); + bool tryConnectAddr(const qpid::Address& address); + void reconnect(const Url& url); + void reset(); + bool restartSessions(); + void restartSession(boost::shared_ptr<SessionContext>); + + std::size_t decodePlain(const char* buffer, std::size_t size); + std::size_t encodePlain(char* buffer, std::size_t size); + bool canEncodePlain(); + + std::size_t readProtocolHeader(const char* buffer, std::size_t size); + std::size_t writeProtocolHeader(char* buffer, std::size_t size); + std::string getError(); + bool useSasl(); + void setProperties(); + + void configureConnection(); + bool checkTransportError(std::string&); + + void discharge(boost::shared_ptr<SessionContext>, bool fail); + void startTxSession(boost::shared_ptr<SessionContext>); + + void syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&); + void sendLH(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&); + void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&); +}; + +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp new file mode 100644 index 0000000000..90227fa29b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionHandle.h" +#include "ConnectionContext.h" +#include "SessionHandle.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/ProtocolRegistry.h" + +namespace qpid { +namespace messaging { +namespace amqp { +// Static constructor which registers this implementation in the ProtocolRegistry +namespace { +ConnectionImpl* create(const std::string& u, const qpid::types::Variant::Map& o) +{ + try { + return new ConnectionHandle(u, o); + } catch (const types::Exception& ) { + throw; + } catch (const qpid::Exception& e) { + throw messaging::ConnectionError( e.what() ); + } +} + +struct StaticInit +{ + StaticInit() + { + ProtocolRegistry::add("amqp1.0", &create); + }; +} init; +} + +ConnectionHandle::ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options) : connection(new ConnectionContext(url, options)) {} +ConnectionHandle::ConnectionHandle(boost::shared_ptr<ConnectionContext> c) : connection(c) {} + +void ConnectionHandle::open() +{ + connection->open(); +} + +bool ConnectionHandle::isOpen() const +{ + return connection->isOpen(); +} + +void ConnectionHandle::close() +{ + connection->close(); +} + +Session ConnectionHandle::newSession(bool transactional, const std::string& name) +{ + return qpid::messaging::Session(new SessionHandle(connection, connection->newSession(transactional, name))); +} + +Session ConnectionHandle::getSession(const std::string& name) const +{ + return qpid::messaging::Session(new SessionHandle(connection, connection->getSession(name))); +} + +void ConnectionHandle::setOption(const std::string& name, const qpid::types::Variant& value) +{ + connection->setOption(name, value); +} + +std::string ConnectionHandle::getAuthenticatedUsername() +{ + return connection->getAuthenticatedUsername(); +} + +void ConnectionHandle::reconnect(const std::string& url) +{ + connection->reconnect(url); +} +void ConnectionHandle::reconnect() +{ + connection->reconnect(); +} +std::string ConnectionHandle::getUrl() const +{ + return connection->getUrl(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h new file mode 100644 index 0000000000..0238313f93 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h @@ -0,0 +1,61 @@ +#ifndef QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H +#define QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +/** + * Handles are directly referenced by applications; Contexts are + * referenced by Handles. This allows a graph structure that + * remains intact as long as the application references any part + * of it, but that can be automatically reclaimed if the whole + * graph becomes unreferenced. + */ +class ConnectionHandle : public qpid::messaging::ConnectionImpl +{ + public: + ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options); + ConnectionHandle(boost::shared_ptr<ConnectionContext>); + void open(); + bool isOpen() const; + void close(); + Session newSession(bool transactional, const std::string& name); + Session getSession(const std::string& name) const; + void setOption(const std::string& name, const qpid::types::Variant& value); + std::string getAuthenticatedUsername(); + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; + private: + boost::shared_ptr<ConnectionContext> connection; +}; + +}}} // namespace qpid::messaging::amqp_1.0 + +#endif /*!QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp new file mode 100644 index 0000000000..ebe3fff1cb --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DriverImpl.h" +#include "Transport.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Timer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +DriverImpl::DriverImpl() : poller(new qpid::sys::Poller), timer(new qpid::sys::Timer) +{ + start(); +} +DriverImpl::~DriverImpl() +{ + stop(); +} + +void DriverImpl::start() +{ + thread = qpid::sys::Thread(*poller); + QPID_LOG(debug, "Driver started"); +} + +void DriverImpl::stop() +{ + QPID_LOG(debug, "Driver stopped"); + poller->shutdown(); + thread.join(); + timer->stop(); +} + +boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection) +{ + boost::shared_ptr<Transport> t(Transport::create(protocol, connection, poller)); + if (!t) throw qpid::messaging::ConnectionError("No such transport: " + protocol); + return t; +} + + +qpid::sys::Mutex DriverImpl::defaultLock; +boost::weak_ptr<DriverImpl> DriverImpl::theDefault; +boost::shared_ptr<DriverImpl> DriverImpl::getDefault() +{ + qpid::sys::Mutex::ScopedLock l(defaultLock); + boost::shared_ptr<DriverImpl> p = theDefault.lock(); + if (!p) { + p = boost::shared_ptr<DriverImpl>(new DriverImpl); + theDefault = p; + } + return p; +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h new file mode 100644 index 0000000000..36cb196343 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h @@ -0,0 +1,64 @@ +#ifndef QPID_MESSAGING_AMQP_DRIVERIMPL_H +#define QPID_MESSAGING_AMQP_DRIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Thread.h" +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> + +namespace qpid { +namespace sys { +class Poller; +class Timer; +} +namespace messaging { +namespace amqp { +class TransportContext; +class Transport; +/** + * + */ +class DriverImpl +{ + public: + DriverImpl(); + ~DriverImpl(); + + void start(); + void stop(); + + boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection); + sys::Timer& getTimer() { return *timer; } + + static boost::shared_ptr<DriverImpl> getDefault(); + private: + boost::shared_ptr<qpid::sys::Poller> poller; + qpid::sys::Thread thread; + std::auto_ptr<sys::Timer> timer; + + static qpid::sys::Mutex defaultLock; + static boost::weak_ptr<DriverImpl> theDefault; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_DRIVERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp new file mode 100644 index 0000000000..cf60046245 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -0,0 +1,366 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/EncodedMessage.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/Exception.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/DataBuilder.h" +#include "qpid/amqp/ListBuilder.h" +#include "qpid/amqp/MapBuilder.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { +using namespace qpid::amqp; + +EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0), nestAnnotations(false) +{ + init(); +} + +EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false) +{ + init(); +} + +EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false) +{ + init(); +} + +void EncodedMessage::init() +{ + //init all CharSequence members + deliveryAnnotations.init(); + messageAnnotations.init(); + userId.init(); + to.init(); + subject.init(); + replyTo.init(); + contentType.init(); + contentEncoding.init(); + groupId.init(); + replyToGroupId.init(); + applicationProperties.init(); + body.init(); + footer.init(); +} + +EncodedMessage::~EncodedMessage() +{ + delete[] data; +} + +size_t EncodedMessage::getSize() const +{ + return size; +} +void EncodedMessage::trim(size_t t) +{ + size = t; +} +void EncodedMessage::resize(size_t s) +{ + delete[] data; + size = s; + data = new char[size]; +} + +char* EncodedMessage::getData() +{ + return data; +} +const char* EncodedMessage::getData() const +{ + return data; +} + +void EncodedMessage::init(qpid::messaging::MessageImpl& impl) +{ + try { + //initial scan of raw data + qpid::amqp::Decoder decoder(data, size); + InitialScan reader(*this, impl); + decoder.read(reader); + bareMessage = reader.getBareMessage(); + if (bareMessage.data && !bareMessage.size) { + bareMessage.size = (data + size) - bareMessage.data; + } + } catch (const qpid::Exception& e) { + throw FetchError(e.what()); + } +} +void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; } + +namespace { +using qpid::types::Variant; +void merge(qpid::types::Variant::Map& map, const qpid::types::Variant::Map& additions) +{ + for (Variant::Map::const_iterator i = additions.begin(); i != additions.end(); ++i) + { + if (map.find(i->first) == map.end()) { + map[i->first] = i->second; + } else { + QPID_LOG(info, "Annotation " << i->first << " hidden by application property of the same name (consider using nest_annotations option?)"); + } + } +} +} + +void EncodedMessage::populate(qpid::types::Variant::Map& map) const +{ + try { + //decode application properties + if (applicationProperties) { + qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size); + decoder.readMap(map); + } + //add in 'x-amqp-' prefixed values + if (!!firstAcquirer) { + map["x-amqp-first-acquirer"] = firstAcquirer.get(); + } + if (!!deliveryCount) { + map["x-amqp-delivery-count"] = deliveryCount.get(); + } + if (to) { + map["x-amqp-to"] = to.str(); + } + if (contentEncoding) { + map["x-amqp-content-encoding"] = contentEncoding.str(); + } + if (!!absoluteExpiryTime) { + map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get(); + } + if (!!creationTime) { + map["x-amqp-creation-time"] = creationTime.get(); + } + if (groupId) { + map["x-amqp-group-id"] = groupId.str(); + } + if (!!groupSequence) { + map["x-amqp-group-sequence"] = groupSequence.get(); + } + if (replyToGroupId) { + map["x-amqp-reply-to-group-id"] = replyToGroupId.str(); + } + //add in any annotations + if (deliveryAnnotations) { + qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); + if (nestAnnotations) { + map["x-amqp-delivery-annotations"] = decoder.readMap(); + } else { + merge(map, decoder.readMap()); + } + } + if (messageAnnotations) { + qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); + if (nestAnnotations) { + map["x-amqp-message-annotations"] = decoder.readMap(); + } else { + merge(map, decoder.readMap()); + } + } + } catch (const qpid::Exception& e) { + throw FetchError(e.what()); + } +} +qpid::amqp::CharSequence EncodedMessage::getBareMessage() const +{ + return bareMessage; +} + +void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const +{ + std::string rt = replyTo.str(); + std::string::size_type i = rt.find('/'); + if (i != std::string::npos && i > 0 && rt.find('/', i+1) == std::string::npos) { + //handle <name>/<subject> special case + a.setName(rt.substr(0, i)); + a.setSubject(rt.substr(i+1)); + } else { + a.setName(rt); + } +} +void EncodedMessage::getSubject(std::string& s) const +{ + s.assign(subject.data, subject.size); +} +void EncodedMessage::getContentType(std::string& s) const +{ + s.assign(contentType.data, contentType.size); +} +void EncodedMessage::getUserId(std::string& s) const +{ + s.assign(userId.data, userId.size); +} +void EncodedMessage::getMessageId(std::string& s) const +{ + messageId.assign(s); +} +void EncodedMessage::getCorrelationId(std::string& s) const +{ + correlationId.assign(s); +} +void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const +{ + try { + if (!content.isVoid()) { + c = content;//integer types, floats, bool etc + //TODO: populate raw data? + } else { + if (bodyType.empty() + || bodyType == qpid::amqp::typecodes::BINARY_NAME + || bodyType == qpid::types::encodings::UTF8 + || bodyType == qpid::types::encodings::ASCII) + { + c = std::string(body.data, body.size); + c.setEncoding(bodyType); + } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) { + qpid::amqp::ListBuilder builder; + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + c = builder.getList(); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) { + qpid::types::Variant v = qpid::types::Variant::Map(); + qpid::amqp::DataBuilder builder(v); + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + c = builder.getValue().asMap(); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) { + if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) { + raw.assign(body.data, body.size); + } + } + } catch (const qpid::Exception& e) { + throw FetchError(e.what()); + } +} + +qpid::amqp::CharSequence EncodedMessage::getBody() const +{ + return body; +} + +bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const +{ + if (!durable) { + if (msg.isDurable()) return true; + } else { + if (durable.get() != msg.isDurable()) return true; + } + + if (!priority) { + if (msg.getPriority() != 4) return true; + } else { + if (priority.get() != msg.getPriority()) return true; + } + + if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) { + return true; + } + + //first-acquirer can't be changed via Message interface as yet + + if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) { + return true; + } + + return false; +} + + + +EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m) +{ + //set up defaults as needed: + mi.setPriority(4); +} +//header: +void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; } +void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; } +void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; } +void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; } +void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i) +{ + mi.setRedelivered(i); + em.deliveryCount = i; +} + +//properties: +void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); } +void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); } +void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; } +void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; } +void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; } +void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;} +void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); } +void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); } +void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; } +void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; } +void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; } +void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; } +void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; } +void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; } +void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; } + +void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.applicationProperties = v; } +void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.deliveryAnnotations = v; } +void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.messageAnnotations = v; } + +void EncodedMessage::InitialScan::onData(const qpid::amqp::CharSequence& v) +{ + em.body = v; +} +void EncodedMessage::InitialScan::onAmqpSequence(const qpid::amqp::CharSequence& v) +{ + em.body = v; + em.bodyType = qpid::amqp::typecodes::LIST_NAME; +} +void EncodedMessage::InitialScan::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& type, const qpid::amqp::Descriptor*) +{ + em.body = v; + if (type == qpid::amqp::typecodes::STRING_NAME) { + em.bodyType = qpid::types::encodings::UTF8; + } else if (type == qpid::amqp::typecodes::SYMBOL_NAME) { + em.bodyType = qpid::types::encodings::ASCII; + } else { + em.bodyType = type; + } +} +void EncodedMessage::InitialScan::onAmqpValue(const qpid::types::Variant& v, const qpid::amqp::Descriptor*) +{ + em.content = v; +} + +void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.footer = v; } + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h new file mode 100644 index 0000000000..241118386c --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h @@ -0,0 +1,190 @@ +#ifndef QPID_MESSAGING_AMQP_ENCODEDMESSAGE_H +#define QPID_MESSAGING_AMQP_ENCODEDMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/ImportExport.h" + +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/MessageId.h" +#include "qpid/amqp/MessageReader.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/types/Variant.h" +#include <boost/optional.hpp> + +namespace qpid { +namespace amqp { +struct Descriptor; +} +namespace messaging { +class Address; +class MessageImpl; +namespace amqp { + +/** + * Used to 'lazy-decode' an AMQP 1.0 message. + * + * There are four categories of data item: + * + * (i) simple, fixed width primitives - priority, ttl, durability, + * delivery count - for which lazy-decoding doesn't buy much. These + * are decoded unconditionally on an initial scan of the message. + * + * (ii) standard variable length string properties - subject, + * message-id, user-id etc - which require conversion to a std::string + * for returning to the application. By delaying the conversion of + * these to a std::string we can avoid allocation & copying until it + * is actually required. The initial scan of the message merely + * records the position of these strings within the raw message data. + * + * (iii) custom, application defined headers. These form a map, and + * again, delaying the creation of that map until it is actually + * required can be advantageous. The initial scan of the message merely + * records the position of this section within the raw message data. + * + * (iv) the body content. This may be retreived as a std::string, or + * as a char*. Avoiding conversion to the string until it is required + * is advantageous. The initial scan of the message merely records the + * position of this section within the raw message data. + * + * At present the Message class only explicitly exposes some of the + * standard property and headers defined by AMQP 1.0. The remainder + * will have to be accessed through the message 'headers' map, using + * the 'x-amqp-' prefix. + */ +class EncodedMessage +{ + public: + QPID_MESSAGING_EXTERN EncodedMessage(); + QPID_MESSAGING_EXTERN EncodedMessage(size_t); + QPID_MESSAGING_EXTERN EncodedMessage(const EncodedMessage&); + QPID_MESSAGING_EXTERN ~EncodedMessage(); + + + QPID_MESSAGING_EXTERN size_t getSize() const; + QPID_MESSAGING_EXTERN char* getData(); + QPID_MESSAGING_EXTERN const char* getData() const; + QPID_MESSAGING_EXTERN void trim(size_t); + QPID_MESSAGING_EXTERN void resize(size_t); + + QPID_MESSAGING_EXTERN void setNestAnnotationsOption(bool); + void getReplyTo(qpid::messaging::Address&) const; + void getSubject(std::string&) const; + void getContentType(std::string&) const; + void getMessageId(std::string&) const; + void getUserId(std::string&) const; + void getCorrelationId(std::string&) const; + void populate(qpid::types::Variant::Map&) const; + void getBody(std::string&, qpid::types::Variant&) const; + + QPID_MESSAGING_EXTERN void init(qpid::messaging::MessageImpl&); + QPID_MESSAGING_EXTERN qpid::amqp::CharSequence getBareMessage() const; + qpid::amqp::CharSequence getBody() const; + QPID_MESSAGING_EXTERN bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const; + private: + size_t size; + char* data; + bool nestAnnotations; + + class InitialScan : public qpid::amqp::MessageReader + { + public: + InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m); + //header: + void onDurable(bool b); + void onPriority(uint8_t i); + void onTtl(uint32_t i); + void onFirstAcquirer(bool b); + void onDeliveryCount(uint32_t i); + //properties: + void onMessageId(uint64_t); + void onMessageId(const qpid::amqp::CharSequence&, qpid::types::VariantType); + void onUserId(const qpid::amqp::CharSequence& v); + void onTo(const qpid::amqp::CharSequence& v); + void onSubject(const qpid::amqp::CharSequence& v); + void onReplyTo(const qpid::amqp::CharSequence& v); + void onCorrelationId(uint64_t); + void onCorrelationId(const qpid::amqp::CharSequence&, qpid::types::VariantType); + void onContentType(const qpid::amqp::CharSequence& v); + void onContentEncoding(const qpid::amqp::CharSequence& v); + void onAbsoluteExpiryTime(int64_t i); + void onCreationTime(int64_t); + void onGroupId(const qpid::amqp::CharSequence&); + void onGroupSequence(uint32_t); + void onReplyToGroupId(const qpid::amqp::CharSequence&); + + void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + + void onData(const qpid::amqp::CharSequence&); + void onAmqpSequence(const qpid::amqp::CharSequence&); + void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type, const qpid::amqp::Descriptor*); + void onAmqpValue(const qpid::types::Variant&, const qpid::amqp::Descriptor*); + + void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + private: + EncodedMessage& em; + qpid::messaging::MessageImpl& mi; + }; + //header: + boost::optional<bool> durable; + boost::optional<uint8_t> priority; + boost::optional<uint32_t> ttl; + boost::optional<bool> firstAcquirer; + boost::optional<uint32_t> deliveryCount; + //annotations: + qpid::amqp::CharSequence deliveryAnnotations; + qpid::amqp::CharSequence messageAnnotations; + + qpid::amqp::CharSequence bareMessage;//properties, application-properties and content + //properties: + qpid::amqp::MessageId messageId; + qpid::amqp::CharSequence userId; + qpid::amqp::CharSequence to; + qpid::amqp::CharSequence subject; + qpid::amqp::CharSequence replyTo; + qpid::amqp::MessageId correlationId; + qpid::amqp::CharSequence contentType; + qpid::amqp::CharSequence contentEncoding; + boost::optional<int64_t> absoluteExpiryTime; + boost::optional<int64_t> creationTime; + qpid::amqp::CharSequence groupId; + boost::optional<uint32_t> groupSequence; + qpid::amqp::CharSequence replyToGroupId; + //application-properties: + qpid::amqp::CharSequence applicationProperties; + //application data: + qpid::amqp::CharSequence body; + std::string bodyType; + qpid::types::Variant content; + + //footer: + qpid::amqp::CharSequence footer; + + void init(); + //not implemented: + EncodedMessage& operator=(const EncodedMessage&); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_ENCODEDMESSAGE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp new file mode 100644 index 0000000000..3309d1a683 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PnData.h" +#include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +using types::Variant; +using namespace types::encodings; + +// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder. +// Collapse them all into a single proton-based codec. + +void PnData::put(const Variant::Map& map) +{ + pn_data_put_map(data); + pn_data_enter(data); + for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + pn_data_put_string(data, bytes(i->first)); + put(i->second); + } + pn_data_exit(data); +} + +void PnData::put(const Variant::List& list) +{ + pn_data_put_list(data); + pn_data_enter(data); + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + put(*i); + } + pn_data_exit(data); +} + +void PnData::put(const Variant& value) +{ + // Open data descriptors associated with the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + pn_data_put_described(data); + pn_data_enter(data); + if (i->getType() == types::VAR_STRING) + pn_data_put_symbol(data, bytes(i->asString())); + else + pn_data_put_ulong(data, i->asUint64()); + } + + // Put the variant value + switch (value.getType()) { + case qpid::types::VAR_VOID: + pn_data_put_null(data); + break; + case qpid::types::VAR_BOOL: + pn_data_put_bool(data, value.asBool()); + break; + case qpid::types::VAR_UINT64: + pn_data_put_ulong(data, value.asUint64()); + break; + case qpid::types::VAR_INT64: + pn_data_put_long(data, value.asInt64()); + break; + case qpid::types::VAR_DOUBLE: + pn_data_put_double(data, value.asDouble()); + break; + case qpid::types::VAR_STRING: + if (value.getEncoding() == ASCII) + pn_data_put_symbol(data, bytes(value.asString())); + else if (value.getEncoding() == BINARY) + pn_data_put_binary(data, bytes(value.asString())); + else + pn_data_put_string(data, bytes(value.asString())); + break; + case qpid::types::VAR_MAP: + put(value.asMap()); + break; + case qpid::types::VAR_LIST: + put(value.asList()); + break; + default: + break; + } + + // Close any descriptors. + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) + pn_data_exit(data); +} + +bool PnData::get(qpid::types::Variant& value) +{ + return get(pn_data_type(data), value); +} + +void PnData::getList(qpid::types::Variant::List& value) +{ + size_t count = pn_data_get_list(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + qpid::types::Variant e; + if (get(e)) value.push_back(e); + } + pn_data_exit(data); +} + +void PnData::getMap(qpid::types::Variant::Map& value) +{ + size_t count = pn_data_get_list(data); + pn_data_enter(data); + for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { + std::string key = string(pn_data_get_symbol(data)); + pn_data_next(data); + qpid::types::Variant e; + if (get(e)) value[key]= e; + } + pn_data_exit(data); +} + +void PnData::getArray(qpid::types::Variant::List& value) +{ + size_t count = pn_data_get_array(data); + pn_type_t type = pn_data_get_array_type(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + qpid::types::Variant e; + if (get(type, e)) value.push_back(e); + } + pn_data_exit(data); +} + +bool PnData::get(pn_type_t type, qpid::types::Variant& value) +{ + switch (type) { + case PN_NULL: + if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant(); + return true; + case PN_BOOL: + value = pn_data_get_bool(data); + return true; + case PN_UBYTE: + value = pn_data_get_ubyte(data); + return true; + case PN_BYTE: + value = pn_data_get_byte(data); + return true; + case PN_USHORT: + value = pn_data_get_ushort(data); + return true; + case PN_SHORT: + value = pn_data_get_short(data); + return true; + case PN_UINT: + value = pn_data_get_uint(data); + return true; + case PN_INT: + value = pn_data_get_int(data); + return true; + case PN_CHAR: + value = pn_data_get_char(data); + return true; + case PN_ULONG: + value = pn_data_get_ulong(data); + return true; + case PN_LONG: + value = pn_data_get_long(data); + return true; + case PN_TIMESTAMP: + value = pn_data_get_timestamp(data); + return true; + case PN_FLOAT: + value = pn_data_get_float(data); + return true; + case PN_DOUBLE: + value = pn_data_get_double(data); + return true; + case PN_UUID: + value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); + return true; + case PN_BINARY: + value = string(pn_data_get_binary(data)); + value.setEncoding(qpid::types::encodings::BINARY); + return true; + case PN_STRING: + value = string(pn_data_get_string(data)); + value.setEncoding(qpid::types::encodings::UTF8); + return true; + case PN_SYMBOL: + value = string(pn_data_get_string(data)); + value.setEncoding(qpid::types::encodings::ASCII); + return true; + case PN_LIST: + value = qpid::types::Variant::List(); + getList(value.asList()); + return true; + break; + case PN_MAP: + value = qpid::types::Variant::Map(); + getMap(value.asMap()); + return true; + case PN_ARRAY: + value = qpid::types::Variant::List(); + getArray(value.asList()); + return true; + case PN_DESCRIBED: + // TODO aconway 2014-11-20: get described values. + case PN_DECIMAL32: + case PN_DECIMAL64: + case PN_DECIMAL128: + default: + return false; + } +} + +pn_bytes_t PnData::bytes(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} + +std::string PnData::string(const pn_bytes_t& in) +{ + return std::string(in.start, in.size); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h new file mode 100644 index 0000000000..b0119f88fd --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h @@ -0,0 +1,61 @@ +#ifndef QPID_MESSAGING_AMQP_PNDATA_H +#define QPID_MESSAGING_AMQP_PNDATA_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/types/Variant.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +/** + * Helper class to put/get messaging types to/from pn_data_t. + */ +class PnData +{ + public: + pn_data_t* data; + + PnData(pn_data_t* d) : data(d) {} + + void put(const types::Variant& value); + void put(const types::Variant::Map& map); + void put(const types::Variant::List& list); + void put(int32_t n) { pn_data_put_int(data, n); } + void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); } + + bool get(pn_type_t type, types::Variant& value); + bool get(types::Variant& value); + void getList(types::Variant::List& value); + void getMap(types::Variant::Map& value); + void getArray(types::Variant::List& value); + + static pn_bytes_t bytes(const std::string&); + static std::string string(const pn_bytes_t&); +}; +}}} // namespace messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_PNDATA_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp new file mode 100644 index 0000000000..a28509b0b1 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/ReceiverContext.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) + : name(n), + address(a), + helper(address), + receiver(pn_receiver(session, name.c_str())), + capacity(0), used(0) {} + +ReceiverContext::~ReceiverContext() +{ + if (receiver) pn_link_free(receiver); +} + +void ReceiverContext::setCapacity(uint32_t c) +{ + if (c != capacity) { + //stop + capacity = c; + //reissue credit + } +} + +uint32_t ReceiverContext::getCapacity() +{ + return capacity; +} + +uint32_t ReceiverContext::getAvailable() +{ + return pn_link_queued(receiver); +} + +uint32_t ReceiverContext::getUnsettled() +{ + assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver)); + return pn_link_unsettled(receiver) - pn_link_queued(receiver); +} + +void ReceiverContext::close() +{ + if (receiver) pn_link_close(receiver); +} + +const std::string& ReceiverContext::getName() const +{ + return name; +} + +const std::string& ReceiverContext::getSource() const +{ + return address.getName(); +} +void ReceiverContext::verify() +{ + pn_terminus_t* source = pn_link_remote_source(receiver); + if (!pn_terminus_get_address(source)) { + std::string msg("No such source : "); + msg += getSource(); + QPID_LOG(debug, msg); + throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(address)) { + address.setName(pn_terminus_get_address(source)); + QPID_LOG(debug, "Dynamic source name set to " << address.getName()); + } + helper.checkAssertion(source, AddressHelper::FOR_RECEIVER); +} +void ReceiverContext::configure() +{ + if (receiver) configure(pn_link_source(receiver)); +} +void ReceiverContext::configure(pn_terminus_t* source) +{ + helper.configure(receiver, source, AddressHelper::FOR_RECEIVER); + std::string option; + if (helper.getLinkTarget(option)) { + pn_terminus_set_address(pn_link_target(receiver), option.c_str()); + } else { + pn_terminus_set_address(pn_link_target(receiver), pn_terminus_get_address(pn_link_source(receiver))); + } +} + +Address ReceiverContext::getAddress() const +{ + return address; +} + +void ReceiverContext::reset(pn_session_t* session) +{ + receiver = session ? pn_receiver(session, name.c_str()) : 0; + if (receiver) configure(); +} + +bool ReceiverContext::hasCurrent() +{ + return receiver && pn_link_current(receiver); +} + +bool ReceiverContext::wakeupToIssueCredit() +{ + if (++used >= (capacity/2)) { + used = 0; + return true; + } else { + return false; + } +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h new file mode 100644 index 0000000000..dd1352aecb --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -0,0 +1,77 @@ +#ifndef QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H +#define QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include <string> +#include "qpid/sys/AtomicCount.h" +#include "qpid/sys/IntegerTypes.h" + +struct pn_link_t; +struct pn_session_t; +struct pn_terminus_t; + +namespace qpid { +namespace messaging { + +class Duration; +class Message; + +namespace amqp { + +/** + * + */ +class ReceiverContext +{ + public: + ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source); + virtual ~ReceiverContext(); + void reset(pn_session_t* session); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void attach(); + void close(); + const std::string& getName() const; + const std::string& getSource() const; + void configure(); + void verify(); + Address getAddress() const; + bool hasCurrent(); + private: + friend class ConnectionContext; + const std::string name; + Address address; + AddressHelper helper; + pn_link_t* receiver; + uint32_t capacity; + uint32_t used; + qpid::sys::AtomicCount fetching; + void configure(pn_terminus_t*); + bool wakeupToIssueCredit(); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp new file mode 100644 index 0000000000..bd7082079c --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverHandle.h" +#include "ConnectionContext.h" +#include "SessionContext.h" +#include "SessionHandle.h" +#include "ReceiverContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +ReceiverHandle::ReceiverHandle(boost::shared_ptr<ConnectionContext> c, + boost::shared_ptr<SessionContext> s, + boost::shared_ptr<ReceiverContext> r +) : connection(c), session(s), receiver(r) {} + + +bool ReceiverHandle::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + return connection->get(session, receiver, message, timeout); +} + +qpid::messaging::Message ReceiverHandle::get(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!get(result, timeout)) throw qpid::messaging::NoMessageAvailable(); + return result; +} + +bool ReceiverHandle::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + return connection->fetch(session, receiver, message, timeout); +} + +qpid::messaging::Message ReceiverHandle::fetch(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw qpid::messaging::NoMessageAvailable(); + return result; +} + +void ReceiverHandle::setCapacity(uint32_t capacity) +{ + connection->setCapacity(receiver, capacity); +} + +uint32_t ReceiverHandle::getCapacity() +{ + return connection->getCapacity(receiver); +} + +uint32_t ReceiverHandle::getAvailable() +{ + return connection->getAvailable(receiver); +} + +uint32_t ReceiverHandle::getUnsettled() +{ + return connection->getUnsettled(receiver); +} + +void ReceiverHandle::close() +{ + connection->detach(session, receiver); +} + +const std::string& ReceiverHandle::getName() const +{ + return receiver->getName(); +} + +qpid::messaging::Session ReceiverHandle::getSession() const +{ + //create new SessionHandle instance; i.e. create new handle that shares the same context + return qpid::messaging::Session(new SessionHandle(connection, session)); +} + +bool ReceiverHandle::isClosed() const +{ + return connection->isClosed(session, receiver); +} + +Address ReceiverHandle::getAddress() const +{ + return receiver->getAddress(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h new file mode 100644 index 0000000000..08a95fb585 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h @@ -0,0 +1,64 @@ +#ifndef QPID_MESSAGING_AMQP_RECEIVERHANDLE_H +#define QPID_MESSAGING_AMQP_RECEIVERHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/ReceiverImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +class ReceiverContext; +/** + * + */ +class ReceiverHandle : public qpid::messaging::ReceiverImpl +{ + public: + ReceiverHandle(boost::shared_ptr<ConnectionContext>, + boost::shared_ptr<SessionContext>, + boost::shared_ptr<ReceiverContext> + ); + bool get(Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message get(qpid::messaging::Duration timeout); + bool fetch(Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message fetch(qpid::messaging::Duration timeout); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void close(); + const std::string& getName() const; + qpid::messaging::Session getSession() const; + bool isClosed() const; + Address getAddress() const; + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; + boost::shared_ptr<ReceiverContext> receiver; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_RECEIVERHANDLE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp b/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp new file mode 100644 index 0000000000..e1c15c2c22 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "qpid/messaging/amqp/Sasl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/log/Statement.h" +#include "qpid/Sasl.h" +#include "qpid/SaslFactory.h" +#include "qpid/StringUtils.h" +#include <sstream> + +namespace qpid { +namespace messaging { +namespace amqp { + +Sasl::Sasl(const std::string& id, ConnectionContext& c, const std::string& hostname_) + : qpid::amqp::SaslClient(id), context(c), + sasl(qpid::SaslFactory::getInstance().create(c.username, c.password, c.service, hostname_, c.minSsf, c.maxSsf, false)), + hostname(hostname_), readHeader(true), writeHeader(true), haveOutput(false), state(NONE) {} + +Sasl::~Sasl() {} +std::size_t Sasl::decode(const char* buffer, std::size_t size) +{ + size_t decoded = 0; + if (readHeader) { + decoded += readProtocolHeader(buffer, size); + readHeader = !decoded; + } + if (state == NONE && decoded < size) { + decoded += read(buffer + decoded, size - decoded); + } + QPID_LOG(trace, id << " Sasl::decode(" << size << "): " << decoded); + return decoded; +} + +std::size_t Sasl::encode(char* buffer, std::size_t size) +{ + size_t encoded = 0; + if (writeHeader) { + encoded += writeProtocolHeader(buffer, size); + writeHeader = !encoded; + } + if (encoded < size) { + encoded += write(buffer + encoded, size - encoded); + } + haveOutput = (encoded == size); + QPID_LOG(trace, id << " Sasl::encode(" << size << "): " << encoded); + return encoded; +} + +bool Sasl::canEncode() +{ + QPID_LOG(trace, id << " Sasl::canEncode(): " << writeHeader << " || " << haveOutput); + return writeHeader || haveOutput; +} + +void Sasl::mechanisms(const std::string& offered) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-MECHANISMS(" << offered << ")"); + std::string response; + + std::string mechanisms; + if (context.mechanism.size()) { + std::vector<std::string> allowed = split(context.mechanism, " "); + std::vector<std::string> supported = split(offered, " "); + std::stringstream intersection; + for (std::vector<std::string>::const_iterator i = allowed.begin(); i != allowed.end(); ++i) { + if (std::find(supported.begin(), supported.end(), *i) != supported.end()) { + intersection << *i << " "; + } + } + mechanisms = intersection.str(); + } else { + mechanisms = offered; + } + + try { + if (sasl->start(mechanisms, response, context.getTransportSecuritySettings())) { + init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0); + } else { + init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0); + } + haveOutput = true; + context.activateOutput(); + } catch (const std::exception& e) { + failed(e.what()); + } +} +void Sasl::challenge(const std::string& challenge) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(" << challenge.size() << " bytes)"); + try { + std::string r = sasl->step(challenge); + response(&r); + haveOutput = true; + context.activateOutput(); + } catch (const std::exception& e) { + failed(e.what()); + } +} +namespace { +const std::string EMPTY; +} +void Sasl::challenge() +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(null)"); + try { + std::string r = sasl->step(EMPTY); + response(&r); + } catch (const std::exception& e) { + failed(e.what()); + } +} +void Sasl::outcome(uint8_t result, const std::string& extra) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ", " << extra << ")"); + outcome(result); +} +void Sasl::outcome(uint8_t result) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ")"); + if (result) state = FAILED; + else state = SUCCEEDED; + + securityLayer = sasl->getSecurityLayer(context.maxFrameSize); + if (securityLayer.get()) { + context.initSecurityLayer(*securityLayer); + } + context.activateOutput(); +} + +bool Sasl::stopReading() +{ + return state != NONE; +} + +qpid::sys::Codec* Sasl::getSecurityLayer() +{ + return securityLayer.get(); +} + +namespace { +const std::string DEFAULT_ERROR("Authentication failed"); +} + +bool Sasl::authenticated() +{ + switch (state) { + case SUCCEEDED: return true; + case FAILED: throw qpid::messaging::AuthenticationFailure(error.size() ? error : DEFAULT_ERROR); + case NONE: default: return false; + } +} + +void Sasl::failed(const std::string& text) +{ + QPID_LOG_CAT(info, client, id << " Failure during authentication: " << text); + error = text; + state = FAILED; +} + +std::string Sasl::getAuthenticatedUsername() +{ + return sasl->getUserId(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/Sasl.h b/qpid/cpp/src/qpid/messaging/amqp/Sasl.h new file mode 100644 index 0000000000..a836e2e465 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Sasl.h @@ -0,0 +1,77 @@ +#ifndef QPID_MESSAGING_AMQP_SASL_H +#define QPID_MESSAGING_AMQP_SASL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Codec.h" +#include "qpid/amqp/SaslClient.h" +#include <memory> + +namespace qpid { +class Sasl; +namespace sys { +class SecurityLayer; +} +namespace messaging { +struct ConnectionOptions; +namespace amqp { +class ConnectionContext; + +/** + * + */ +class Sasl : public qpid::sys::Codec, qpid::amqp::SaslClient +{ + public: + Sasl(const std::string& id, ConnectionContext& context, const std::string& hostname); + ~Sasl(); + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + + bool authenticated(); + qpid::sys::Codec* getSecurityLayer(); + std::string getAuthenticatedUsername(); + private: + ConnectionContext& context; + std::auto_ptr<qpid::Sasl> sasl; + std::string hostname; + bool readHeader; + bool writeHeader; + bool haveOutput; + enum { + NONE, FAILED, SUCCEEDED + } state; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + std::string error; + + void mechanisms(const std::string&); + void challenge(const std::string&); + void challenge(); //null != empty string + void outcome(uint8_t result, const std::string&); + void outcome(uint8_t result); + void failed(const std::string&); + protected: + bool stopReading(); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SASL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp new file mode 100644 index 0000000000..5289fbdf9b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -0,0 +1,643 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderContext.h" +#include "Transaction.h" +#include "EncodedMessage.h" +#include "PnData.h" +#include "util.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/Exception.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/log/Statement.h" +#include "config.h" +extern "C" { +#include <proton/engine.h> +} +#include <boost/shared_ptr.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { + +//TODO: proper conversion to wide string for address +SenderContext::SenderContext(pn_session_t* session, const std::string& n, + const qpid::messaging::Address& a, + bool setToOnSend_, + const CoordinatorPtr& coord) + : sender(pn_sender(session, n.c_str())), + name(n), + address(a), + helper(address), + nextId(0), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_), + transaction(coord) +{} + +SenderContext::~SenderContext() +{ + if (sender) pn_link_free(sender); +} + +void SenderContext::close() +{ + if (sender) pn_link_close(sender); +} + +void SenderContext::setCapacity(uint32_t c) +{ + if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!"); + capacity = c; +} + +uint32_t SenderContext::getCapacity() +{ + return capacity; +} + +uint32_t SenderContext::getUnsettled() +{ + return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/); +} + +const std::string& SenderContext::getName() const +{ + return name; +} + +const std::string& SenderContext::getTarget() const +{ + return address.getName(); +} + +bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out) +{ + resend();//if there are any messages needing to be resent at the front of the queue, send them first + if (processUnsettled(false) < capacity && pn_link_credit(sender)) { + types::Variant state; + if (transaction) + state = transaction->getSendState(); + if (unreliable) { + Delivery delivery(nextId++); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.send(sender, unreliable, state); + *out = 0; + return true; + } else { + deliveries.push_back(Delivery(nextId++)); + try { + Delivery& delivery = deliveries.back(); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.send(sender, unreliable, state); + *out = &delivery; + return true; + } catch (const std::exception& e) { + deliveries.pop_back(); + --nextId; + throw SendError(e.what()); + } + } + } else { + return false; + } +} + +void SenderContext::check() +{ + if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) { + std::string text = get_error_string(pn_link_remote_condition(sender), "Link detached by peer"); + pn_link_close(sender); + throw qpid::messaging::LinkError(text); + } +} + +uint32_t SenderContext::processUnsettled(bool silent) +{ + if (!silent) { + check(); + } + //remove messages from front of deque once peer has confirmed receipt + while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) { + deliveries.front().settle(); + deliveries.pop_front(); + } + return deliveries.size(); +} +namespace { +const std::string X_AMQP("x-amqp-"); +const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer"); +const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count"); +const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id"); + +class HeaderAdapter : public qpid::amqp::MessageEncoder::Header +{ + public: + HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl), headers(msg.getHeaders()) {} + virtual bool isDurable() const + { + return msg.isDurable(); + } + virtual uint8_t getPriority() const + { + return msg.getPriority(); + } + virtual bool hasTtl() const + { + return msg.getTtl(); + } + virtual uint32_t getTtl() const + { + return msg.getTtl(); + } + virtual bool isFirstAcquirer() const + { + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_FIRST_ACQUIRER); + if (i != headers.end()) { + return i->second; + } else { + return false; + } + } + virtual uint32_t getDeliveryCount() const + { + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_DELIVERY_COUNT); + if (i != headers.end()) { + return i->second; + } else { + return msg.isRedelivered() ? 1 : 0; + } + } + private: + const qpid::messaging::MessageImpl& msg; + const qpid::types::Variant::Map& headers; +}; +const std::string EMPTY; +const std::string FORWARD_SLASH("/"); +const std::string X_AMQP_TO("x-amqp-to"); +const std::string X_AMQP_CONTENT_ENCODING("x-amqp-content-encoding"); +const std::string X_AMQP_CREATION_TIME("x-amqp-creation-time"); +const std::string X_AMQP_ABSOLUTE_EXPIRY_TIME("x-amqp-absolute-expiry-time"); +const std::string X_AMQP_GROUP_ID("x-amqp-group-id"); +const std::string X_AMQP_GROUP_SEQUENCE("x-amqp-group-sequence"); +const std::string X_AMQP_REPLY_TO_GROUP_ID("x-amqp-reply-to-group-id"); +const std::string X_AMQP_MESSAGE_ANNOTATIONS("x-amqp-message-annotations"); +const std::string X_AMQP_DELIVERY_ANNOTATIONS("x-amqp-delivery-annotations"); + +class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties +{ + public: + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s, const std::string& t) : msg(impl), headers(msg.getHeaders()), subject(s), to(t) {} + bool hasMessageId() const + { + return getMessageId().size(); + } + std::string getMessageId() const + { + return msg.getMessageId(); + } + + bool hasUserId() const + { + return getUserId().size(); + } + + std::string getUserId() const + { + return msg.getUserId(); + } + + bool hasTo() const + { + return hasHeader(X_AMQP_TO) || !to.empty(); + } + + std::string getTo() const + { + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_TO); + if (i == headers.end()) return to; + else return i->second; + } + + bool hasSubject() const + { + return subject.size() || getSubject().size(); + } + + std::string getSubject() const + { + return subject.size() ? subject : msg.getSubject(); + } + + bool hasReplyTo() const + { + return msg.getReplyTo(); + } + + std::string getReplyTo() const + { + Address a = msg.getReplyTo(); + if (a.getSubject().size()) { + return a.getName() + FORWARD_SLASH + a.getSubject(); + } else { + return a.getName(); + } + } + + bool hasCorrelationId() const + { + return getCorrelationId().size(); + } + + std::string getCorrelationId() const + { + return msg.getCorrelationId(); + } + + bool hasContentType() const + { + return getContentType().size(); + } + + std::string getContentType() const + { + return msg.getContentType(); + } + + bool hasContentEncoding() const + { + return hasHeader(X_AMQP_CONTENT_ENCODING); + } + + std::string getContentEncoding() const + { + return headers.find(X_AMQP_CONTENT_ENCODING)->second; + } + + bool hasAbsoluteExpiryTime() const + { + return hasHeader(X_AMQP_ABSOLUTE_EXPIRY_TIME); + } + + int64_t getAbsoluteExpiryTime() const + { + return headers.find(X_AMQP_ABSOLUTE_EXPIRY_TIME)->second; + } + + bool hasCreationTime() const + { + return hasHeader(X_AMQP_CREATION_TIME); + } + + int64_t getCreationTime() const + { + return headers.find(X_AMQP_CREATION_TIME)->second; + } + + bool hasGroupId() const + { + return hasHeader(X_AMQP_GROUP_ID); + } + + std::string getGroupId() const + { + return headers.find(X_AMQP_GROUP_ID)->second; + } + + bool hasGroupSequence() const + { + return hasHeader(X_AMQP_GROUP_SEQUENCE); + } + + uint32_t getGroupSequence() const + { + return headers.find(X_AMQP_GROUP_SEQUENCE)->second; + } + + bool hasReplyToGroupId() const + { + return hasHeader(X_AMQP_REPLY_TO_GROUP_ID); + } + + std::string getReplyToGroupId() const + { + return headers.find(X_AMQP_REPLY_TO_GROUP_ID)->second; + } + private: + const qpid::messaging::MessageImpl& msg; + const qpid::types::Variant::Map& headers; + const std::string subject; + const std::string to; + + bool hasHeader(const std::string& key) const + { + return headers.find(key) != headers.end(); + } +}; + +bool startsWith(const std::string& input, const std::string& pattern) +{ + if (input.size() < pattern.size()) return false; + for (std::string::const_iterator b = pattern.begin(), a = input.begin(); b != pattern.end(); ++b, ++a) { + if (*a != *b) return false; + } + return true; +} +class ApplicationPropertiesAdapter : public qpid::amqp::MessageEncoder::ApplicationProperties +{ + public: + ApplicationPropertiesAdapter(const qpid::types::Variant::Map& h) : headers(h) {} + void handle(qpid::amqp::MapHandler& h) const + { + for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) { + //strip out values with special keys as they are sent in standard fields + if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) { + qpid::amqp::CharSequence key(convert(i->first)); + switch (i->second.getType()) { + case qpid::types::VAR_VOID: + h.handleVoid(key); + break; + case qpid::types::VAR_BOOL: + h.handleBool(key, i->second); + break; + case qpid::types::VAR_UINT8: + h.handleUint8(key, i->second); + break; + case qpid::types::VAR_UINT16: + h.handleUint16(key, i->second); + break; + case qpid::types::VAR_UINT32: + h.handleUint32(key, i->second); + break; + case qpid::types::VAR_UINT64: + h.handleUint64(key, i->second); + break; + case qpid::types::VAR_INT8: + h.handleInt8(key, i->second); + break; + case qpid::types::VAR_INT16: + h.handleInt16(key, i->second); + break; + case qpid::types::VAR_INT32: + h.handleInt32(key, i->second); + break; + case qpid::types::VAR_INT64: + h.handleInt64(key, i->second); + break; + case qpid::types::VAR_FLOAT: + h.handleFloat(key, i->second); + break; + case qpid::types::VAR_DOUBLE: + h.handleDouble(key, i->second); + break; + case qpid::types::VAR_STRING: + h.handleString(key, convert(i->second), convert(i->second.getEncoding())); + break; + case qpid::types::VAR_UUID: + QPID_LOG(warning, "Skipping UUID in application properties; not yet handled correctly."); + break; + case qpid::types::VAR_MAP: + case qpid::types::VAR_LIST: + QPID_LOG(warning, "Skipping nested list and map; not allowed in application properties."); + break; + } + } + } + } + private: + const qpid::types::Variant::Map& headers; + + static qpid::amqp::CharSequence convert(const std::string& in) + { + qpid::amqp::CharSequence out; + out.data = in.data(); + out.size = in.size(); + return out; + } +}; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + +} + +SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {} + +void SenderContext::Delivery::reset() +{ + token = 0; +} + +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address, bool setToField) +{ + try { + boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); + + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered + //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? + if (original->hasHeaderChanged(msg)) { + //since as yet have no annotations, just write the revised header then the rest of the message as received + encoded.resize(16/*max header size*/ + original->getBareMessage().size); + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + HeaderAdapter header(msg); + encoder.writeHeader(header); + ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size); + } else { + //since as yet have no annotations, if the header hasn't + //changed and we still have the original bare message, can + //send the entire content as is + encoded.resize(original->getSize()); + ::memcpy(encoded.getData(), original->getData(), original->getSize()); + } + } else { + HeaderAdapter header(msg); + PropertiesAdapter properties(msg, address.getSubject(), setToField ? address.getName() : EMPTY); + ApplicationPropertiesAdapter applicationProperties(msg.getHeaders()); + //compute size: + size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header) + + qpid::amqp::MessageEncoder::getEncodedSize(properties) + + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties); + if (msg.getContent().isVoid()) { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes()); + } else { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/; + } + encoded.resize(contentSize); + QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + //write header: + encoder.writeHeader(header); + //write delivery-annotations, write message-annotations (none yet supported) + //write properties + encoder.writeProperties(properties); + //write application-properties + encoder.writeApplicationProperties(applicationProperties); + //write body + if (!msg.getContent().isVoid()) { + //write as AmqpValue + encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE); + } else if (msg.getBytes().size()) { + encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + } + if (encoder.getPosition() < encoded.getSize()) { + QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); + encoded.trim(encoder.getPosition()); + } + //write footer (no annotations yet supported) + } + } catch (const qpid::Exception& e) { + throw SendError(e.what()); + } +} + +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) +{ + pn_delivery_tag_t tag; + tag.size = sizeof(id); +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = reinterpret_cast<const char*>(&id); +#else + tag.bytes = reinterpret_cast<const char*>(&id); +#endif + token = pn_delivery(sender, tag); + if (!state.isVoid()) { // Add transaction state + PnData data(pn_disposition_data(pn_delivery_local(token))); + data.put(state); + pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + } + pn_link_send(sender, encoded.getData(), encoded.getSize()); + if (unreliable) { + pn_delivery_settle(token); + presettled = true; + } + pn_link_advance(sender); +} + +bool SenderContext::Delivery::sent() const +{ + return presettled || token; +} +bool SenderContext::Delivery::delivered() +{ + if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) { + //TODO: need a better means for signalling outcomes other than accepted + if (rejected()) { + QPID_LOG(warning, "delivery " << id << " was rejected by peer"); + } else if (!accepted()) { + QPID_LOG(info, "delivery " << id << " was not accepted by peer"); + } + return true; + } else { + return false; + } +} +bool SenderContext::Delivery::accepted() +{ + return pn_delivery_remote_state(token) == PN_ACCEPTED; +} +bool SenderContext::Delivery::rejected() +{ + return pn_delivery_remote_state(token) == PN_REJECTED; +} + +std::string SenderContext::Delivery::error() +{ + pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + return (condition && pn_condition_is_set(condition)) ? + Msg() << get_error_string(condition, std::string(), std::string()) : + std::string(); +} + +void SenderContext::Delivery::settle() +{ + pn_delivery_settle(token); +} +void SenderContext::verify() +{ + pn_terminus_t* target = pn_link_remote_target(sender); + if (!pn_terminus_get_address(target)) { + std::string msg("No such target : "); + msg += getTarget(); + QPID_LOG(debug, msg); + throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(address)) { + address.setName(pn_terminus_get_address(target)); + QPID_LOG(debug, "Dynamic target name set to " << address.getName()); + } + + helper.checkAssertion(target, AddressHelper::FOR_SENDER); +} + +void SenderContext::configure() +{ + if (sender) configure(pn_link_target(sender)); +} + +void SenderContext::configure(pn_terminus_t* target) +{ + helper.configure(sender, target, AddressHelper::FOR_SENDER); + std::string option; + if (helper.getLinkSource(option)) { + pn_terminus_set_address(pn_link_source(sender), option.c_str()); + } else { + pn_terminus_set_address(pn_link_source(sender), pn_terminus_get_address(pn_link_target(sender))); + } +} + +bool SenderContext::settled() +{ + return processUnsettled(false) == 0; +} + +bool SenderContext::closed() +{ + return pn_link_state(sender) & PN_LOCAL_CLOSED; +} + +Address SenderContext::getAddress() const +{ + return address; +} + + +void SenderContext::reset(pn_session_t* session) +{ + sender = session ? pn_sender(session, name.c_str()) : 0; + if (sender) configure(); + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) + i->reset(); +} + +void SenderContext::resend() +{ + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) { + i->send(sender, false/*only resend reliable transfers*/); + } +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h new file mode 100644 index 0000000000..467a8e0d3d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -0,0 +1,119 @@ +#ifndef QPID_MESSAGING_AMQP_SENDERCONTEXT_H +#define QPID_MESSAGING_AMQP_SENDERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <deque> +#include <string> +#include <vector> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/IntegerTypes.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/amqp/EncodedMessage.h" + +struct pn_delivery_t; +struct pn_link_t; +struct pn_session_t; +struct pn_terminus_t; + +namespace qpid { +namespace messaging { + +class Message; +class MessageImpl; + +namespace amqp { + +class Transaction; + + +class SenderContext +{ + public: + class Delivery + { + public: + Delivery(int32_t id); + void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField); + void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant()); + bool delivered(); + bool accepted(); + bool rejected(); + void settle(); + void reset(); + bool sent() const; + pn_delivery_t* getToken() const { return token; } + std::string error(); + private: + int32_t id; + pn_delivery_t* token; + EncodedMessage encoded; + bool presettled; + }; + + typedef boost::shared_ptr<Transaction> CoordinatorPtr; + + SenderContext(pn_session_t* session, const std::string& name, + const qpid::messaging::Address& target, + bool setToOnSend, + const CoordinatorPtr& transaction = CoordinatorPtr()); + virtual ~SenderContext(); + + virtual void reset(pn_session_t* session); + virtual void close(); + virtual void setCapacity(uint32_t); + virtual uint32_t getCapacity(); + virtual uint32_t getUnsettled(); + virtual const std::string& getName() const; + virtual const std::string& getTarget() const; + virtual bool send(const qpid::messaging::Message& message, Delivery**); + virtual void configure(); + virtual void verify(); + virtual void check(); + virtual bool settled(); + virtual bool closed(); + virtual Address getAddress() const; + + protected: + pn_link_t* sender; + + private: + friend class ConnectionContext; + typedef std::deque<Delivery> Deliveries; + + const std::string name; + qpid::messaging::Address address; + AddressHelper helper; + int32_t nextId; + Deliveries deliveries; + uint32_t capacity; + bool unreliable; + bool setToOnSend; + boost::shared_ptr<Transaction> transaction; + + uint32_t processUnsettled(bool silent); + void configure(pn_terminus_t*); + void resend(); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SENDERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp new file mode 100644 index 0000000000..98f2d34e7d --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderHandle.h" +#include "ConnectionContext.h" +#include "SessionContext.h" +#include "SessionHandle.h" +#include "SenderContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c, + boost::shared_ptr<SessionContext> s, + boost::shared_ptr<SenderContext> sndr +) : connection(c), session(s), sender(sndr) {} + +void SenderHandle::send(const Message& message, bool sync) +{ + SenderContext::Delivery* d = 0; + connection->send(session, sender, message, sync, &d); +} + +void SenderHandle::close() +{ + connection->detach(session, sender); +} + +void SenderHandle::setCapacity(uint32_t capacity) +{ + connection->setCapacity(sender, capacity); +} + +uint32_t SenderHandle::getCapacity() +{ + return connection->getCapacity(sender); +} + +uint32_t SenderHandle::getUnsettled() +{ + return connection->getUnsettled(sender); +} + +const std::string& SenderHandle::getName() const +{ + return sender->getName(); +} + +qpid::messaging::Session SenderHandle::getSession() const +{ + return qpid::messaging::Session(new SessionHandle(connection, session)); +} + +Address SenderHandle::getAddress() const +{ + return sender->getAddress(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h new file mode 100644 index 0000000000..fab158c1ef --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h @@ -0,0 +1,59 @@ +#ifndef QPID_MESSAGING_AMQP_SENDERHANDLE_H +#define QPID_MESSAGING_AMQP_SENDERHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/SenderImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +class SenderContext; +/** + * + */ +class SenderHandle : public qpid::messaging::SenderImpl +{ + public: + SenderHandle(boost::shared_ptr<ConnectionContext> connection, + boost::shared_ptr<SessionContext> session, + boost::shared_ptr<SenderContext> sender + ); + void send(const Message& message, bool sync); + void close(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getUnsettled(); + const std::string& getName() const; + Session getSession() const; + Address getAddress() const; + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; + boost::shared_ptr<SenderContext> sender; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SENDERHANDLE_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp new file mode 100644 index 0000000000..92bdea7dbc --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionContext.h" +#include "SenderContext.h" +#include "ReceiverContext.h" +#include "Transaction.h" +#include "PnData.h" +#include <boost/format.hpp> +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/amqp/descriptors.h" + +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} + +SessionContext::~SessionContext() +{ + // Clear all pointers to senders and receivers before we free the session. + senders.clear(); + receivers.clear(); + transaction.reset(); // Transaction is a sender. + if (!error && session) + pn_session_free(session); +} + +boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend) +{ + error.raise(); + std::string name = AddressHelper::getLinkName(address); + if (senders.find(name) != senders.end()) + throw LinkError("Link name must be unique within the scope of the connection"); + boost::shared_ptr<SenderContext> s( + new SenderContext(session, name, address, setToOnSend, transaction)); + senders[name] = s; + return s; +} + +boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) +{ + error.raise(); + std::string name = AddressHelper::getLinkName(address); + if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection"); + boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); + receivers[name] = r; + return r; +} + +boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const +{ + error.raise(); + SenderMap::const_iterator i = senders.find(name); + if (i == senders.end()) { + throw qpid::messaging::KeyError(std::string("No such sender") + name); + } else { + return i->second; + } +} + +boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const +{ + error.raise(); + ReceiverMap::const_iterator i = receivers.find(name); + if (i == receivers.end()) { + throw qpid::messaging::KeyError(std::string("No such receiver") + name); + } else { + return i->second; + } +} + +void SessionContext::removeReceiver(const std::string& n) +{ + error.raise(); + receivers.erase(n); +} + +void SessionContext::removeSender(const std::string& n) +{ + error.raise(); + senders.erase(n); +} + +boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() +{ + error.raise(); + for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { + if (i->second->hasCurrent()) { + return i->second; + } + } + + return boost::shared_ptr<ReceiverContext>(); +} + +uint32_t SessionContext::getReceivable() +{ + error.raise(); + return 0;//TODO +} + +uint32_t SessionContext::getUnsettledAcks() +{ + error.raise(); + return 0;//TODO +} + +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +{ + error.raise(); + qpid::framing::SequenceNumber id = next++; + if (!pn_delivery_settled(delivery)) { + unacked[id] = delivery; + QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); + pn_link_advance(pn_delivery_link(delivery)); + } else { + pn_delivery_settle(delivery); // Automatically advances the link. + } + return id; +} + +void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) +{ + error.raise(); + for (DeliveryMap::iterator i = begin; i != end; ++i) { + types::Variant txState; + if (transaction) { + QPID_LOG(trace, "Setting disposition for transactional delivery " + << i->first << " -> " << i->second); + transaction->acknowledge(i->second); + } else { + QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_delivery_update(i->second, PN_ACCEPTED); + pn_delivery_settle(i->second); //TODO: different settlement modes? + } + } + unacked.erase(begin, end); +} + +void SessionContext::acknowledge() +{ + error.raise(); + QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); + acknowledge(unacked.begin(), unacked.end()); +} + +void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) +{ + error.raise(); + QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative); + DeliveryMap::iterator i = unacked.find(id); + if (i != unacked.end()) { + DeliveryMap::iterator start = cumulative ? unacked.begin() : i; + acknowledge(start, ++i); + } else { + QPID_LOG(debug, "selective acknowledgement failed; message not found for id " << id); + } +} + +void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) +{ + error.raise(); + DeliveryMap::iterator i = unacked.find(id); + if (i != unacked.end()) { + if (reject) { + QPID_LOG(debug, "rejecting message with id=" << id); + pn_delivery_update(i->second, PN_REJECTED); + } else { + QPID_LOG(debug, "releasing message with id=" << id); + pn_delivery_update(i->second, PN_MODIFIED); + pn_disposition_set_failed(pn_delivery_local(i->second), true); + } + pn_delivery_settle(i->second); + unacked.erase(i); + } +} + +bool SessionContext::settled() +{ + error.raise(); + bool result = true; + + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + try { + if (!i->second->closed() && !i->second->settled()) result = false; + } catch (const std::exception&) { + senders.erase(i); + throw; + } + } + return result; +} + +void SessionContext::setName(const std::string& n) +{ + name = n; +} +std::string SessionContext::getName() const +{ + return name; +} + +void SessionContext::reset(pn_connection_t* connection) +{ + unacked.clear(); + if (transaction) { + if (transaction->isCommitting()) + error = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + error = new TransactionAborted("Transaction aborted: transport failure"); + resetSession(0); + senders.clear(); + receivers.clear(); + transaction.reset(); + return; + } + resetSession(pn_session(connection)); + +} + +void SessionContext::resetSession(pn_session_t* session_) { + session = session_; + if (transaction) transaction->reset(session); + for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + i->second->reset(session); + } + for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { + i->second->reset(session); + } +} + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h new file mode 100644 index 0000000000..67b3c1e401 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -0,0 +1,95 @@ +#ifndef QPID_MESSAGING_AMQP_SESSIONCONTEXT_H +#define QPID_MESSAGING_AMQP_SESSIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <map> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/IntegerTypes.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/ExceptionHolder.h" + +struct pn_connection_t; +struct pn_session_t; +struct pn_delivery_t; + +namespace qpid { +namespace messaging { + +class Address; +class Duration; + +namespace amqp { + +class ConnectionContext; +class SenderContext; +class ReceiverContext; +class Transaction; + +/** + * + */ +class SessionContext +{ + public: + SessionContext(pn_connection_t*); + ~SessionContext(); + void reset(pn_connection_t*); + boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, bool setToOnSend); + boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); + boost::shared_ptr<SenderContext> getSender(const std::string& name) const; + boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; + void removeReceiver(const std::string&); + void removeSender(const std::string&); + boost::shared_ptr<ReceiverContext> nextReceiver(); + uint32_t getReceivable(); + uint32_t getUnsettledAcks(); + bool settled(); + void setName(const std::string&); + std::string getName() const; + + void nack(const qpid::framing::SequenceNumber& id, bool reject); + + private: + friend class ConnectionContext; + typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; + typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap; + typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap; + + pn_session_t* session; + SenderMap senders; + boost::shared_ptr<Transaction> transaction; + ReceiverMap receivers; + DeliveryMap unacked; + qpid::framing::SequenceNumber next; + std::string name; + sys::ExceptionHolder error; + + qpid::framing::SequenceNumber record(pn_delivery_t*); + void acknowledge(); + void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); + void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); + void resetSession(pn_session_t*); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SESSIONCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp new file mode 100644 index 0000000000..6b90d69c7f --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionHandle.h" +#include "ConnectionContext.h" +#include "ConnectionHandle.h" +#include "ReceiverContext.h" +#include "ReceiverHandle.h" +#include "SenderContext.h" +#include "SenderHandle.h" +#include "SessionContext.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shared_ptr<SessionContext> s) : connection(c), session(s) {} + +void SessionHandle::commit() +{ + connection->commit(session); +} + +void SessionHandle::rollback() +{ + connection->rollback(session); +} + +void SessionHandle::acknowledge(bool /*sync*/) +{ + connection->acknowledge(session, 0, false); +} + +void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative) +{ + connection->acknowledge(session, &msg, cumulative); +} + +void SessionHandle::reject(qpid::messaging::Message& msg) +{ + connection->nack(session, msg, true); +} + +void SessionHandle::release(qpid::messaging::Message& msg) +{ + connection->nack(session, msg, false); +} + +void SessionHandle::close() +{ + connection->endSession(session); +} + +void SessionHandle::sync(bool block) +{ + if (block) { + connection->sync(session); + } +} + +qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address) +{ + boost::shared_ptr<SenderContext> sender = connection->createSender(session, address); + return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); +} + +qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address) +{ + boost::shared_ptr<ReceiverContext> receiver = connection->createReceiver(session, address); + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); +} + +bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) +{ + boost::shared_ptr<ReceiverContext> r = connection->nextReceiver(session, timeout); + if (r) { + //TODO: cache handles in this case to avoid frequent allocation + receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r)); + return true; + } else { + return false; + } +} + +qpid::messaging::Receiver SessionHandle::nextReceiver(Duration timeout) +{ + qpid::messaging::Receiver r; + if (nextReceiver(r, timeout)) return r; + else throw qpid::messaging::NoMessageAvailable(); +} + +uint32_t SessionHandle::getReceivable() +{ + return session->getReceivable(); +} + +uint32_t SessionHandle::getUnsettledAcks() +{ + return session->getUnsettledAcks(); +} + +Sender SessionHandle::getSender(const std::string& name) const +{ + return qpid::messaging::Sender(new SenderHandle(connection, session, connection->getSender(session, name))); +} + +Receiver SessionHandle::getReceiver(const std::string& name) const +{ + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, connection->getReceiver(session, name))); +} + +Connection SessionHandle::getConnection() const +{ + return qpid::messaging::Connection(new ConnectionHandle(connection)); +} + +void SessionHandle::checkError() +{ + +} + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h new file mode 100644 index 0000000000..5e843aaacc --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.h @@ -0,0 +1,64 @@ +#ifndef QPID_MESSAGING_AMQP_SESSIONIMPL_H +#define QPID_MESSAGING_AMQP_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/SessionImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +/** + * + */ +class SessionHandle : public qpid::messaging::SessionImpl +{ + public: + SessionHandle(boost::shared_ptr<ConnectionContext>, boost::shared_ptr<SessionContext>); + void commit(); + void rollback(); + void acknowledge(bool sync); + void acknowledge(Message&, bool); + void reject(Message&); + void release(Message&); + void close(); + void sync(bool block); + qpid::messaging::Sender createSender(const Address& address); + qpid::messaging::Receiver createReceiver(const Address& address); + bool nextReceiver(Receiver& receiver, Duration timeout); + qpid::messaging::Receiver nextReceiver(Duration timeout); + uint32_t getReceivable(); + uint32_t getUnsettledAcks(); + qpid::messaging::Sender getSender(const std::string& name) const; + qpid::messaging::Receiver getReceiver(const std::string& name) const; + qpid::messaging::Connection getConnection() const; + void checkError(); + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SESSIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp new file mode 100644 index 0000000000..e8ef2d587b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SslTransport.h" +#include "TransportContext.h" +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Poller.h" +#include "qpid/client/ssl.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +using namespace qpid::sys; +using namespace qpid::sys::ssl; + +namespace qpid { +namespace messaging { +namespace amqp { + +// Static constructor which registers connector here +namespace { +Transport* create(TransportContext& c, Poller::shared_ptr p) +{ + qpid::client::initialiseSSL(); + return new SslTransport(c, p); +} + +struct StaticInit +{ + StaticInit() + { + Transport::add("ssl", &create); + }; + + ~StaticInit() + { + qpid::client::shutdownSSL(); + } +} init; +} + + +SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) +{ + const ConnectionOptions* options = context.getOptions(); + options->configureSocket(socket); + if (options->sslCertName != "") { + QPID_LOG(debug, "ssl-cert-name = " << options->sslCertName); + socket.setCertName(options->sslCertName); + } + if (options->sslIgnoreHostnameVerificationFailure) { + socket.ignoreHostnameVerificationFailure(); + } +} + +void SslTransport::connect(const std::string& host, const std::string& port) +{ + assert(!connector); + assert(!aio); + connector = AsynchConnector::create( + socket, + host, port, + boost::bind(&SslTransport::connected, this, _1), + boost::bind(&SslTransport::failed, this, _3)); + + connector->start(poller); +} + +void SslTransport::failed(const std::string& msg) +{ + QPID_LOG(debug, "Failed to connect: " << msg); + socket.close(); + context.closed(); +} + +void SslTransport::connected(const Socket&) +{ + context.opened(); + aio = AsynchIO::create(socket, + boost::bind(&SslTransport::read, this, _1, _2), + boost::bind(&SslTransport::eof, this, _1), + boost::bind(&SslTransport::disconnected, this, _1), + boost::bind(&SslTransport::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslTransport::write, this, _1)); + aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes + id = boost::str(boost::format("[%1%]") % socket.getFullAddress()); + aio->start(poller); +} + +void SslTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer) +{ + int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount); + if (decoded < buffer->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buffer->dataStart += decoded; + buffer->dataCount -= decoded; + aio->unread(buffer); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buffer); + } +} + +void SslTransport::write(AsynchIO&) +{ + if (context.getCodec().canEncode()) { + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } + } + +} + +void SslTransport::close() +{ + QPID_LOG(debug, id << " SslTransport closing..."); + if (aio) + aio->queueWriteClose(); +} + +void SslTransport::eof(AsynchIO&) +{ + close(); +} + +void SslTransport::disconnected(AsynchIO&) +{ + close(); + socketClosed(*aio, socket); +} + +void SslTransport::socketClosed(AsynchIO&, const Socket&) +{ + if (aio) + aio->queueForDeletion(); + context.closed(); + QPID_LOG(debug, id << " Socket closed"); +} + +void SslTransport::abort() +{ + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&SslTransport::eof, this, _1)); + } +} + +void SslTransport::activateOutput() +{ + if (aio) aio->notifyPendingWrite(); +} + +const qpid::sys::SecuritySettings* SslTransport::getSecuritySettings() +{ + securitySettings.ssf = socket.getKeyLen(); + securitySettings.authid = "dummy";//set to non-empty string to enable external authentication + return &securitySettings; +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h new file mode 100644 index 0000000000..2972be4fac --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h @@ -0,0 +1,78 @@ +#ifndef QPID_MESSAGING_AMQP_SSLTRANSPORT_H +#define QPID_MESSAGING_AMQP_SSLTRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/Transport.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/ssl/SslSocket.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { +class ConnectionCodec; +class Poller; +class AsynchConnector; +class AsynchIO; +class AsynchIOBufferBase; +} + +namespace messaging { +namespace amqp { +class TransportContext; + +class SslTransport : public Transport +{ + public: + SslTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller> p); + + void connect(const std::string& host, const std::string& port); + + void activateOutput(); + void abort(); + void connectionEstablished() {}; + void close(); + const qpid::sys::SecuritySettings* getSecuritySettings(); + + private: + qpid::sys::ssl::SslSocket socket; + TransportContext& context; + qpid::sys::AsynchConnector* connector; + qpid::sys::AsynchIO* aio; + boost::shared_ptr<qpid::sys::Poller> poller; + bool closed; + std::string id; + qpid::sys::SecuritySettings securitySettings; + + void connected(const qpid::sys::Socket&); + void failed(const std::string& msg); + void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void write(qpid::sys::AsynchIO&); + void eof(qpid::sys::AsynchIO&); + void disconnected(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); + + friend class DriverImpl; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SSLTRANSPORT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp new file mode 100644 index 0000000000..a919e974d6 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "TcpTransport.h" +#include "ConnectionContext.h" +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Poller.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +using namespace qpid::sys; + +namespace qpid { +namespace messaging { +namespace amqp { +// Static constructor which registers connector here +namespace { +Transport* create(TransportContext& c, Poller::shared_ptr p) +{ + return new TcpTransport(c, p); +} + +struct StaticInit +{ + StaticInit() + { + Transport::add("tcp", &create); + }; +} init; +} + +TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p), closed(false) {} + +void TcpTransport::connect(const std::string& host, const std::string& port) +{ + assert(!connector); + assert(!aio); + context.getOptions()->configureSocket(*socket); + connector = AsynchConnector::create( + *socket, + host, port, + boost::bind(&TcpTransport::connected, this, _1), + boost::bind(&TcpTransport::failed, this, _3)); + + connector->start(poller); +} + +void TcpTransport::failed(const std::string& msg) +{ + QPID_LOG(debug, "Failed to connect: " << msg); + closed = true; + connector = 0; + socket->close(); + context.closed(); +} + +void TcpTransport::connected(const Socket&) +{ + context.opened(); + connector = 0; + aio = AsynchIO::create(*socket, + boost::bind(&TcpTransport::read, this, _1, _2), + boost::bind(&TcpTransport::eof, this, _1), + boost::bind(&TcpTransport::disconnected, this, _1), + boost::bind(&TcpTransport::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&TcpTransport::write, this, _1)); + aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes + id = boost::str(boost::format("[%1%]") % socket->getFullAddress()); + aio->start(poller); +} + +void TcpTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer) +{ + int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount); + if (decoded < buffer->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buffer->dataStart += decoded; + buffer->dataCount -= decoded; + aio->unread(buffer); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buffer); + } +} + +void TcpTransport::write(AsynchIO&) +{ + if (context.getCodec().canEncode()) { + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } + } + +} + +void TcpTransport::close() +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed) { + QPID_LOG(debug, id << " TcpTransport closing..."); + if (aio) + aio->queueWriteClose(); + } +} + +void TcpTransport::eof(AsynchIO&) +{ + close(); +} + +void TcpTransport::disconnected(AsynchIO&) +{ + close(); + socketClosed(*aio, *socket); +} + +void TcpTransport::socketClosed(AsynchIO&, const Socket&) +{ + bool notify(false); + { + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed) { + closed = true; + if (aio) + aio->queueForDeletion(); + QPID_LOG(debug, id << " Socket closed"); + notify = true; + } //else has already been closed + } + if (notify) context.closed(); +} + +void TcpTransport::abort() +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed) { + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1)); + } else if (connector) { + // We're still connecting + connector->stop(); + failed("Connection timedout"); + } + } +} + +void TcpTransport::activateOutput() +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed && aio) aio->notifyPendingWrite(); +} + +const qpid::sys::SecuritySettings* TcpTransport::getSecuritySettings() +{ + return 0; +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h new file mode 100644 index 0000000000..3e59ec97b2 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h @@ -0,0 +1,78 @@ +#ifndef QPID_MESSAGING_AMQP_TCPTRANSPORT_H +#define QPID_MESSAGING_AMQP_TCPTRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/Transport.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include <boost/scoped_ptr.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { +class ConnectionCodec; +class AsynchConnector; +class AsynchIO; +struct AsynchIOBufferBase; +class Poller; +} +namespace messaging { +namespace amqp { +class TransportContext; + +class TcpTransport : public Transport +{ + public: + TcpTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller>); + + void connect(const std::string& host, const std::string& port); + + void activateOutput(); + void abort(); + void connectionEstablished() {}; + void close(); + const qpid::sys::SecuritySettings* getSecuritySettings(); + + protected: + boost::scoped_ptr<qpid::sys::Socket> socket; + TransportContext& context; + qpid::sys::AsynchConnector* connector; + qpid::sys::AsynchIO* aio; + boost::shared_ptr<qpid::sys::Poller> poller; + std::string id; + + virtual ~TcpTransport() {} + virtual void connected(const qpid::sys::Socket&); + void failed(const std::string& msg); + void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void write(qpid::sys::AsynchIO&); + void eof(qpid::sys::AsynchIO&); + void disconnected(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); + + private: + bool closed; + qpid::sys::Mutex lock; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_TCPTRANSPORT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp new file mode 100644 index 0000000000..754b00d802 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Transaction.h" +#include "SessionContext.h" +#include "ConnectionContext.h" +#include "PnData.h" +#include <proton/engine.h> +#include <qpid/Exception.h> +#include <qpid/amqp/descriptors.h> +#include <qpid/messaging/exceptions.h> +#include <qpid/log/Statement.h> +#include "qpid/messaging/Message.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +using namespace types; +using types::Exception; + +namespace { +const std::string LOCAL_TRANSACTIONS("amqp:local-transactions"); +const std::string TX_COORDINATOR("tx-transaction"); +const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}"); +} + +Transaction::Transaction(pn_session_t* session) : + SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false) +{} + +void Transaction::clear() { + id.clear(); + sendState.reset(); + acceptState.reset(); +} + +void Transaction::configure() { + SenderContext::configure(); + pn_terminus_t* target = pn_link_target(sender); + pn_terminus_set_type(target, PN_COORDINATOR); + PnData(pn_terminus_capabilities(target)).putSymbol(LOCAL_TRANSACTIONS); +} + +void Transaction::verify() {} + +const std::string& Transaction::getTarget() const { return getName(); } + +void Transaction::declare(SendFunction send, const SessionPtr& session) { + committing = false; + error.raise(); + clear(); + Variant declare = Variant::described(qpid::amqp::transaction::DECLARE_CODE, Variant::List()); + SenderContext::Delivery* delivery = 0; + send(session, shared_from_this(), Message(declare), true, &delivery); + setId(*delivery); +} + +void Transaction::discharge(SendFunction send, const SessionPtr& session, bool fail) { + error.raise(); + committing = !fail; + try { + // Send a discharge message to the remote coordinator. + Variant::List dischargeList; + dischargeList.push_back(Variant(id)); + dischargeList.push_back(Variant(fail)); + Variant discharge(dischargeList); + discharge.setDescriptor(qpid::amqp::transaction::DISCHARGE_CODE); + SenderContext::Delivery* delivery = 0; + send(session, shared_from_this(), Message(discharge), true, &delivery); + if (!delivery->accepted()) + throw TransactionAborted(delivery->error()); + committing = false; + } + catch(const TransactionError&) { + throw; + } + catch(const Exception& e) { + committing = false; + throw TransactionAborted(e.what()); + } +} + +// Set the transaction ID from the delivery returned by the remote coordinator. +void Transaction::setId(const SenderContext::Delivery& delivery) +{ + if (delivery.getToken() && + pn_delivery_remote_state(delivery.getToken()) == qpid::amqp::transaction::DECLARED_CODE) + { + pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery.getToken())); + if (data && pn_data_next(data)) { + size_t count = pn_data_get_list(data); + if (count > 0) { + pn_data_enter(data); + pn_data_next(data); + setId(PnData::string(pn_data_get_binary(data))); + pn_data_exit(data); + return; + } + } + } + throw TransactionError("No transaction ID returned by remote coordinator."); +} + +void Transaction::setId(const std::string& id_) { + id = id_; + if (id.empty()) { + clear(); + } + else { + // NOTE: The send and accept states are NOT described, the descriptor + // is added in pn_delivery_update. + Variant::List list; + list.push_back(Variant(id, "binary")); + sendState = Variant(list); + + Variant accepted = Variant::described(qpid::amqp::message::ACCEPTED_CODE, Variant::List()); + list.push_back(accepted); + acceptState = Variant(list); + } +} + +types::Variant Transaction::getSendState() const { + error.raise(); + return sendState; +} + +void Transaction::acknowledge(pn_delivery_t* delivery) +{ + error.raise(); + PnData data(pn_disposition_data(pn_delivery_local(delivery))); + data.put(acceptState); + pn_delivery_update(delivery, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + pn_delivery_settle(delivery); +} + + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.h b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h new file mode 100644 index 0000000000..35492c9bb3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h @@ -0,0 +1,95 @@ +#ifndef COORDINATORCONTEXT_H +#define COORDINATORCONTEXT_H +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#include "SenderContext.h" +#include <qpid/types/Variant.h> +#include "qpid/sys/ExceptionHolder.h" +#include <boost/enable_shared_from_this.hpp> +#include <boost/function.hpp> + +struct pn_session_t; + +namespace qpid { +namespace messaging { +namespace amqp { + +class SessionContext; +class ConnectionContext; + +/** + * Track the current transaction for a session. + * + * Implements SenderContext, to send transaction command messages to remote coordinator. + */ +class Transaction : public SenderContext, public boost::enable_shared_from_this<Transaction> { + public: + typedef boost::shared_ptr<SessionContext> SessionPtr; + + typedef boost::function<void (boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery)> SendFunction; + + Transaction(pn_session_t*); + + sys::ExceptionHolder error; + + /** Declare a transaction using connection and session to send to remote co-ordinator. */ + void declare(SendFunction, const SessionPtr& session); + + /** Discharge a transaction using connection and session to send to remote co-ordinator. + *@param fail: true means rollback, false means commit. + */ + void discharge(SendFunction, const SessionPtr& session, bool fail); + + /** Update a delivery with a transactional accept state. */ + void acknowledge(pn_delivery_t* delivery); + + /** Get delivery state to attach to transfers sent in a transaction. */ + types::Variant getSendState() const; + + /** Override SenderContext::getTarget with a more readable value */ + const std::string& getTarget() const; + + bool isCommitting() const { return committing; } + + protected: + // SenderContext overrides + void configure(); + void verify(); + + private: + std::string id; + types::Variant sendState; + types::Variant acceptState; + bool committing; + + + void clear(); + void setId(const SenderContext::Delivery& delivery); + void setId(const std::string& id); +}; + +}}} + +#endif diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transport.cpp b/qpid/cpp/src/qpid/messaging/amqp/Transport.cpp new file mode 100644 index 0000000000..21f51046b1 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transport.cpp @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/Transport.h" +#include "qpid/messaging/amqp/TransportContext.h" +#include <map> +#include <string> + +namespace qpid { +namespace messaging { +namespace amqp { +namespace { +typedef std::map<std::string, Transport::Factory*> Registry; + +Registry& theRegistry() +{ + static Registry factories; + return factories; +} +} + +Transport* Transport::create(const std::string& name, TransportContext& context, boost::shared_ptr<qpid::sys::Poller> poller) +{ + Registry::const_iterator i = theRegistry().find(name); + if (i != theRegistry().end()) return (i->second)(context, poller); + else return 0; +} +void Transport::add(const std::string& name, Factory* factory) +{ + theRegistry()[name] = factory; +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transport.h b/qpid/cpp/src/qpid/messaging/amqp/Transport.h new file mode 100644 index 0000000000..6ec99ab58f --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transport.h @@ -0,0 +1,52 @@ +#ifndef QPID_MESSAGING_AMQP_TRANSPORT_H +#define QPID_MESSAGING_AMQP_TRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/CommonImportExport.h" +#include "qpid/sys/OutputControl.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { +class Poller; +struct SecuritySettings; +} +namespace messaging { +namespace amqp { +class TransportContext; + +class Transport : public qpid::sys::OutputControl +{ + public: + virtual ~Transport() {} + virtual void connect(const std::string& host, const std::string& port) = 0; + virtual void close() = 0; + virtual void abort() = 0; + virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0; + + typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>); + QPID_COMMON_EXTERN static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>); + QPID_COMMON_EXTERN static void add(const std::string& name, Factory* factory); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_TRANSPORT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/TransportContext.h b/qpid/cpp/src/qpid/messaging/amqp/TransportContext.h new file mode 100644 index 0000000000..df9add3e0b --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/TransportContext.h @@ -0,0 +1,50 @@ +#ifndef QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H +#define QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace sys { +class Codec; +} +namespace messaging { +struct ConnectionOptions; + +namespace amqp { + +/** + * Interface to be supplied by 'users' of Transport interface, in + * order to provide codec and handle callbaskc for opening and closing + * of connection. + */ +class TransportContext +{ + public: + virtual ~TransportContext() {} + virtual qpid::sys::Codec& getCodec() = 0; + virtual const qpid::messaging::ConnectionOptions* getOptions() = 0; + virtual void closed() = 0; + virtual void opened() = 0; + private: +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/util.cpp b/qpid/cpp/src/qpid/messaging/amqp/util.cpp new file mode 100644 index 0000000000..870a89e364 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/util.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "util.h" +#include <sstream> + +namespace qpid { +namespace messaging { +namespace amqp { + +std::string get_error_string(pn_condition_t* error, const std::string& general, const std::string& delim) +{ + std::string name; + std::stringstream text; + if (pn_condition_is_set(error)) { + name = pn_condition_get_name(error); + text << general << delim << name; + const char* desc = pn_condition_get_description(error); + if (desc) text << ": " << desc; + } else { + text << general; + } + return text.str(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/util.h b/qpid/cpp/src/qpid/messaging/amqp/util.h new file mode 100644 index 0000000000..d10ef406dc --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/util.h @@ -0,0 +1,36 @@ +#ifndef QPID_MESSAGING_AMQP_UTIL_H +#define QPID_MESSAGING_AMQP_UTIL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +extern "C" { +#include <proton/engine.h> +} +namespace qpid { +namespace messaging { +namespace amqp { + +std::string get_error_string(pn_condition_t* error, const std::string& general, const std::string& delim = std::string(" with ")); + +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_UTIL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp new file mode 100644 index 0000000000..5dbc13175f --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/amqp/TcpTransport.h" +#include "qpid/messaging/amqp/TransportContext.h" +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Poller.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/util.h" +#include "qpid/sys/windows/SslAsynchIO.h" +#include "qpid/sys/windows/SslCredential.h" + +using namespace qpid::sys; + +namespace qpid { +namespace messaging { +namespace amqp { + +class SslTransport : public TcpTransport +{ + public: + SslTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller> p); + + void connect(const std::string& host, const std::string& port); + void negotiationDone(SECURITY_STATUS status); + const qpid::sys::SecuritySettings* getSecuritySettings(); + + private: + std::string brokerHost; + qpid::sys::windows::SslCredential sslCredential; + bool certLoaded; + qpid::sys::SecuritySettings securitySettings; + + void connected(const qpid::sys::Socket&); +}; + +// Static constructor which registers connector here +namespace { +Transport* create(TransportContext& c, Poller::shared_ptr p) +{ + return new SslTransport(c, p); +} + +struct StaticInit +{ + StaticInit() + { + Transport::add("ssl", &create); + }; +} init; +} + + +void SslTransport::negotiationDone(SECURITY_STATUS status) +{ + if (status == SEC_E_OK) { + connector = 0; + context.opened(); + id = boost::str(boost::format("[%1%]") % socket->getFullAddress()); + } else { + if (status == SEC_E_INCOMPLETE_CREDENTIALS && !certLoaded) { + // Server requested a client cert but we supplied none for the following reason: + failed(QPID_MSG(sslCredential.error())); + } + else + failed(QPID_MSG(qpid::sys::strError(status))); + } +} + +SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : TcpTransport(c, p) +{ + const ConnectionOptions* options = context.getOptions(); + if (options->sslIgnoreHostnameVerificationFailure) { + sslCredential.ignoreHostnameVerificationFailure(); + } + const std::string& name = (options->sslCertName != "") ? + options->sslCertName : qpid::sys::ssl::SslOptions::global.certName; + certLoaded = sslCredential.load(name); + QPID_LOG(debug, "SslTransport created"); +} + +void SslTransport::connect(const std::string& host, const std::string& port) +{ + brokerHost = host; + TcpTransport::connect(host, port); +} + +void SslTransport::connected(const Socket& s) +{ + aio = new qpid::sys::windows::ClientSslAsynchIO(brokerHost, + s, + sslCredential.handle(), + boost::bind(&SslTransport::read, this, _1, _2), + boost::bind(&SslTransport::eof, this, _1), + boost::bind(&SslTransport::disconnected, this, _1), + boost::bind(&SslTransport::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslTransport::write, this, _1), + boost::bind(&SslTransport::negotiationDone, this, _1)); + + aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes + aio->start(poller); +} + +const qpid::sys::SecuritySettings* SslTransport::getSecuritySettings() +{ + securitySettings.ssf = socket->getKeyLen(); + securitySettings.authid = "dummy";//set to non-empty string to enable external authentication + return &securitySettings; +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp new file mode 100644 index 0000000000..af8ab22251 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/exceptions.h" + +namespace qpid { +namespace messaging { + +MessagingException::MessagingException(const std::string& msg) : qpid::types::Exception(msg) {} +MessagingException::~MessagingException() throw() {} + +InvalidOptionString::InvalidOptionString(const std::string& msg) : MessagingException(msg) {} +KeyError::KeyError(const std::string& msg) : MessagingException(msg) {} + + +LinkError::LinkError(const std::string& msg) : MessagingException(msg) {} + +AddressError::AddressError(const std::string& msg) : LinkError(msg) {} +ResolutionError::ResolutionError(const std::string& msg) : AddressError(msg) {} +MalformedAddress::MalformedAddress(const std::string& msg) : AddressError(msg) {} +AssertionFailed::AssertionFailed(const std::string& msg) : ResolutionError(msg) {} +NotFound::NotFound(const std::string& msg) : ResolutionError(msg) {} + +ReceiverError::ReceiverError(const std::string& msg) : LinkError(msg) {} +FetchError::FetchError(const std::string& msg) : ReceiverError(msg) {} +NoMessageAvailable::NoMessageAvailable() : FetchError("No message to fetch") {} + +SenderError::SenderError(const std::string& msg) : LinkError(msg) {} +SendError::SendError(const std::string& msg) : SenderError(msg) {} +MessageRejected::MessageRejected(const std::string& msg) : SendError(msg) {} +TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {} +OutOfCapacity::OutOfCapacity(const std::string& msg) : SendError(msg) {} + +SessionError::SessionError(const std::string& msg) : MessagingException(msg) {} +SessionClosed::SessionClosed() : SessionError("Session Closed") {} + +TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {} +TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {} +TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionError(msg) {} +UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {} + +ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {} +ProtocolVersionError::ProtocolVersionError(const std::string& msg) : ConnectionError(msg) {} +AuthenticationFailure::AuthenticationFailure(const std::string& msg) : ConnectionError(msg) {} + +TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {} + +}} // namespace qpid::messaging |