diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/qpid/messaging | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/messaging')
19 files changed, 1652 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp new file mode 100644 index 0000000000..a516959edb --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Address.cpp @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/framing/Uuid.h" +#include <sstream> +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +namespace { +const std::string SUBJECT_DIVIDER = "/"; +const std::string OPTIONS_DIVIDER = ";"; +const std::string SPACE = " "; +const std::string TYPE = "type"; +} +class AddressImpl +{ + public: + std::string name; + std::string subject; + Variant::Map options; + + AddressImpl() {} + AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) : + name(n), subject(s), options(o) {} +}; + +class AddressParser +{ + public: + AddressParser(const std::string&); + bool parse(Address& address); + private: + const std::string& input; + std::string::size_type current; + static const std::string RESERVED; + + bool readChar(char c); + bool readQuotedString(std::string& s); + bool readQuotedValue(Variant& value); + bool readString(std::string& value, char delimiter); + bool readWord(std::string& word, const std::string& delims = RESERVED); + bool readSimpleValue(Variant& word); + bool readKey(std::string& key); + bool readValue(Variant& value); + bool readKeyValuePair(Variant::Map& map); + bool readMap(Variant& value); + bool readList(Variant& value); + bool readName(std::string& name); + bool readSubject(std::string& subject); + bool error(const std::string& message); + bool eos(); + bool iswhitespace(); + bool in(const std::string& delims); + bool isreserved(); +}; + +Address::Address() : impl(new AddressImpl()) {} +Address::Address(const std::string& address) : impl(new AddressImpl()) +{ + AddressParser parser(address); + parser.parse(*this); +} +Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options, + const std::string& type) + : impl(new AddressImpl(name, subject, options)) { setType(type); } +Address::Address(const Address& a) : + impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {} +Address::~Address() { delete impl; } + +Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; } + + +std::string Address::str() const +{ + std::stringstream out; + out << impl->name; + if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject; + if (!impl->options.empty()) out << OPTIONS_DIVIDER << impl->options; + return out.str(); +} +Address::operator bool() const { return !impl->name.empty(); } +bool Address::operator !() const { return impl->name.empty(); } + +const std::string& Address::getName() const { return impl->name; } +void Address::setName(const std::string& name) { impl->name = name; } +const std::string& Address::getSubject() const { return impl->subject; } +void Address::setSubject(const std::string& subject) { impl->subject = subject; } +const Variant::Map& Address::getOptions() const { return impl->options; } +Variant::Map& Address::getOptions() { return impl->options; } +void Address::setOptions(const Variant::Map& options) { impl->options = options; } + + +namespace{ +const Variant EMPTY_VARIANT; +const std::string EMPTY_STRING; +const std::string NODE_PROPERTIES="node"; +} + +const Variant& find(const Variant::Map& map, const std::string& key) +{ + Variant::Map::const_iterator i = map.find(key); + if (i == map.end()) return EMPTY_VARIANT; + else return i->second; +} + +std::string Address::getType() const +{ + const Variant& props = find(impl->options, NODE_PROPERTIES); + if (props.getType() == VAR_MAP) { + const Variant& type = find(props.asMap(), TYPE); + if (!type.isVoid()) return type.asString(); + } + return EMPTY_STRING; +} + +void Address::setType(const std::string& type) +{ + Variant& props = impl->options[NODE_PROPERTIES]; + if (props.isVoid()) props = Variant::Map(); + props.asMap()[TYPE] = type; +} + +std::ostream& operator<<(std::ostream& out, const Address& address) +{ + out << address.str(); + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.cpp b/qpid/cpp/src/qpid/messaging/AddressParser.cpp new file mode 100644 index 0000000000..4c8f35fbc5 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/AddressParser.cpp @@ -0,0 +1,265 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AddressParser.h" +#include "qpid/framing/Uuid.h" +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +AddressParser::AddressParser(const std::string& s) : input(s), current(0) {} + +bool AddressParser::error(const std::string& message) +{ + throw MalformedAddress((boost::format("%1%, character %2% of %3%") % message % current % input).str()); +} + +bool AddressParser::parse(Address& address) +{ + std::string name; + if (readName(name)) { + if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name; + address.setName(name); + if (readChar('/')) { + std::string subject; + readSubject(subject); + address.setSubject(subject); + } + if (readChar(';')) { + Variant options = Variant::Map(); + if (readMap(options)) { + address.setOptions(options.asMap()); + } + } + //skip trailing whitespace + while (!eos() && iswhitespace()) ++current; + return eos() || error("Unexpected chars in address: " + input.substr(current)); + } else { + return input.empty() || error("Expected name"); + } +} + +bool AddressParser::parseMap(Variant::Map& map) +{ + if (readChar('{')) { + readMapEntries(map); + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +bool AddressParser::parseList(Variant::List& list) +{ + if (readChar('[')) { + readListItems(list); + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + + +bool AddressParser::readList(Variant& value) +{ + if (readChar('[')) { + value = Variant::List(); + readListItems(value.asList()); + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + +void AddressParser::readListItems(Variant::List& list) +{ + Variant item; + while (readValueIfExists(item)) { + list.push_back(item); + if (!readChar(',')) break; + } +} + +bool AddressParser::readMap(Variant& value) +{ + if (readChar('{')) { + value = Variant::Map(); + readMapEntries(value.asMap()); + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +void AddressParser::readMapEntries(Variant::Map& map) +{ + while (readKeyValuePair(map) && readChar(',')) {} +} + +bool AddressParser::readKeyValuePair(Variant::Map& map) +{ + std::string key; + Variant value; + if (readKey(key)) { + if (readChar(':') && readValue(value)) { + map[key] = value; + return true; + } else { + return error("Bad key-value pair, expected ':'"); + } + } else { + return false; + } +} + +bool AddressParser::readKey(std::string& key) +{ + return readWord(key) || readQuotedString(key); +} + +bool AddressParser::readValue(Variant& value) +{ + return readValueIfExists(value) || error("Expected value"); +} + +bool AddressParser::readValueIfExists(Variant& value) +{ + return readSimpleValue(value) || readQuotedValue(value) || + readMap(value) || readList(value); +} + +bool AddressParser::readString(std::string& value, char delimiter) +{ + if (readChar(delimiter)) { + std::string::size_type start = current++; + while (!eos()) { + if (input.at(current) == delimiter) { + if (current > start) { + value = input.substr(start, current - start); + } else { + value = ""; + } + ++current; + return true; + } else { + ++current; + } + } + return error("Unmatched delimiter"); + } else { + return false; + } +} + +bool AddressParser::readName(std::string& name) +{ + return readQuotedString(name) || readWord(name, "/;"); +} + +bool AddressParser::readSubject(std::string& subject) +{ + return readQuotedString(subject) || readWord(subject, ";"); +} + +bool AddressParser::readQuotedString(std::string& s) +{ + return readString(s, '"') || readString(s, '\''); +} + +bool AddressParser::readQuotedValue(Variant& value) +{ + std::string s; + if (readQuotedString(s)) { + value = s; + return true; + } else { + return false; + } +} + +bool AddressParser::readSimpleValue(Variant& value) +{ + std::string s; + if (readWord(s)) { + value.parse(s); + return true; + } else { + return false; + } +} + +bool AddressParser::readWord(std::string& value, const std::string& delims) +{ + //skip leading whitespace + while (!eos() && iswhitespace()) ++current; + + //read any number of non-whitespace, non-reserved chars into value + std::string::size_type start = current; + while (!eos() && !iswhitespace() && !in(delims)) ++current; + + if (current > start) { + value = input.substr(start, current - start); + return true; + } else { + return false; + } +} + +bool AddressParser::readChar(char c) +{ + while (!eos()) { + if (iswhitespace()) { + ++current; + } else if (input.at(current) == c) { + ++current; + return true; + } else { + return false; + } + } + return false; +} + +bool AddressParser::iswhitespace() +{ + return ::isspace(input.at(current)); +} + +bool AddressParser::isreserved() +{ + return in(RESERVED); +} + +bool AddressParser::in(const std::string& chars) +{ + return chars.find(input.at(current)) != std::string::npos; +} + +bool AddressParser::eos() +{ + return current >= input.size(); +} + +const std::string AddressParser::RESERVED = "\'\"{}[],:/"; + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.h b/qpid/cpp/src/qpid/messaging/AddressParser.h new file mode 100644 index 0000000000..1635331d19 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/AddressParser.h @@ -0,0 +1,66 @@ +#ifndef QPID_MESSAGING_ADDRESSPARSER_H +#define QPID_MESSAGING_ADDRESSPARSER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" + +namespace qpid { +namespace messaging { + +class AddressParser +{ + public: + AddressParser(const std::string&); + bool parse(Address& address); + bool parseMap(qpid::types::Variant::Map& map); + bool parseList(qpid::types::Variant::List& list); + private: + const std::string& input; + std::string::size_type current; + static const std::string RESERVED; + + bool readChar(char c); + bool readQuotedString(std::string& s); + bool readQuotedValue(qpid::types::Variant& value); + bool readString(std::string& value, char delimiter); + bool readWord(std::string& word, const std::string& delims = RESERVED); + bool readSimpleValue(qpid::types::Variant& word); + bool readKey(std::string& key); + bool readValue(qpid::types::Variant& value); + bool readValueIfExists(qpid::types::Variant& value); + bool readKeyValuePair(qpid::types::Variant::Map& map); + bool readMap(qpid::types::Variant& value); + bool readList(qpid::types::Variant& value); + bool readName(std::string& name); + bool readSubject(std::string& subject); + bool error(const std::string& message); + bool eos(); + bool iswhitespace(); + bool in(const std::string& delims); + bool isreserved(); + void readListItems(qpid::types::Variant::List& list); + void readMapEntries(qpid::types::Variant::Map& map); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_ADDRESSPARSER_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp new file mode 100644 index 0000000000..bd90aa54a7 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/PrivateImplRef.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +typedef PrivateImplRef<qpid::messaging::Connection> PI; + +Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); } +Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); } +Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); } +Connection::~Connection() { PI::dtor(*this); } + +Connection::Connection(const std::string& url, const std::string& o) +{ + Variant::Map options; + AddressParser parser(o); + if (o.empty() || parser.parseMap(options)) { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + } else { + throw InvalidOptionString("Invalid option string: " + o); + } +} +Connection::Connection(const std::string& url, const Variant::Map& options) +{ + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); +} + +Connection::Connection() +{ + Variant::Map options; + std::string url = "amqp:tcp:127.0.0.1:5672"; + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); +} + +void Connection::open() { impl->open(); } +bool Connection::isOpen() { return impl->isOpen(); } +bool Connection::isOpen() const { return impl->isOpen(); } +void Connection::close() { impl->close(); } +Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); } +Session Connection::createTransactionalSession(const std::string& name) +{ + return impl->newSession(true, name); +} +Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } +void Connection::setOption(const std::string& name, const Variant& value) +{ + impl->setOption(name, value); +} +std::string Connection::getAuthenticatedUsername() +{ + return impl->getAuthenticatedUsername(); +} +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h new file mode 100644 index 0000000000..1e11d9a6d5 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h @@ -0,0 +1,52 @@ +#ifndef QPID_MESSAGING_CONNECTIONIMPL_H +#define QPID_MESSAGING_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/RefCounted.h" + +namespace qpid { + +namespace types { +class Variant; +} + +namespace messaging { + +class Session; + +class ConnectionImpl : public virtual qpid::RefCounted +{ + public: + virtual ~ConnectionImpl() {} + virtual void open() = 0; + virtual bool isOpen() const = 0; + virtual void close() = 0; + virtual Session newSession(bool transactional, const std::string& name) = 0; + virtual Session getSession(const std::string& name) const = 0; + virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0; + virtual std::string getAuthenticatedUsername() = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_CONNECTIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Duration.cpp b/qpid/cpp/src/qpid/messaging/Duration.cpp new file mode 100644 index 0000000000..a23e9f5bcb --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Duration.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Duration.h" +#include <limits> + +namespace qpid { +namespace messaging { + +Duration::Duration(uint64_t ms) : milliseconds(ms) {} +uint64_t Duration::getMilliseconds() const { return milliseconds; } + +Duration operator*(const Duration& duration, uint64_t multiplier) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +Duration operator*(uint64_t multiplier, const Duration& duration) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +bool operator==(const Duration& a, const Duration& b) +{ + return a.getMilliseconds() == b.getMilliseconds(); +} + +bool operator!=(const Duration& a, const Duration& b) +{ + return a.getMilliseconds() != b.getMilliseconds(); +} + +const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max()); +const Duration Duration::IMMEDIATE(0); +const Duration Duration::SECOND(1000); +const Duration Duration::MINUTE(SECOND * 60); + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp b/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp new file mode 100644 index 0000000000..4f2fcf2e82 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/FailoverUpdates.cpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/FailoverUpdates.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Session.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" +#include "qpid/Url.h" +#include "qpid/framing/Uuid.h" +#include <vector> + +namespace qpid { +namespace messaging { +using framing::Uuid; + +struct FailoverUpdatesImpl : qpid::sys::Runnable +{ + Connection connection; + Session session; + Receiver receiver; + qpid::sys::Thread thread; + + FailoverUpdatesImpl(Connection& c) : connection(c) + { + session = connection.createSession("failover-updates."+Uuid(true).str()); + receiver = session.createReceiver("amq.failover"); + thread = qpid::sys::Thread(*this); + } + + ~FailoverUpdatesImpl() { + try { + session.close(); + } catch(...) {} // Squash exceptions in a destructor. + thread.join(); + } + + void run() + { + try { + Message message; + while (receiver.fetch(message)) { + connection.setOption("reconnect-urls", message.getProperties()["amq.failover"]); + QPID_LOG(debug, "Set reconnect-urls to " << message.getProperties()["amq.failover"]); + session.acknowledge(); + } + } + catch (const ClosedException&) {} + catch (const qpid::TransportFailure& e) { + QPID_LOG(warning, "Failover updates stopped on loss of connection. " << e.what()); + } + catch (const std::exception& e) { + QPID_LOG(warning, "Failover updates stopped due to exception: " << e.what()); + } + } +}; + +FailoverUpdates::FailoverUpdates(Connection& connection) : impl(new FailoverUpdatesImpl(connection)) {} +FailoverUpdates::~FailoverUpdates() { if (impl) { delete impl; } } +FailoverUpdates::FailoverUpdates(const FailoverUpdates&) : impl(0) {} +FailoverUpdates& FailoverUpdates::operator=(const FailoverUpdates&) { return *this; } + + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp b/qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp new file mode 100644 index 0000000000..c9a7680bb4 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/HandleInstantiator.cpp @@ -0,0 +1,64 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace messaging {
+
+using namespace qpid::types;
+
+void HandleInstantiatorDoNotCall(void)
+{
+ // This function exists to instantiate various template Handle
+ // bool functions. The instances are then available to
+ // the qpidmessaging DLL and subsequently exported.
+ // This function must not be exported nor called called.
+ // For further information refer to
+ // https://issues.apache.org/jira/browse/QPID-2926
+
+ Connection connection;
+ if (connection.isValid()) connection.close();
+ if (connection.isNull() ) connection.close();
+ if (connection ) connection.close();
+ if (!connection ) connection.close();
+
+ Receiver receiver;
+ if (receiver.isValid()) receiver.close();
+ if (receiver.isNull() ) receiver.close();
+ if (receiver ) receiver.close();
+ if (!receiver ) receiver.close();
+
+ Sender sender;
+ if (sender.isValid()) sender.close();
+ if (sender.isNull() ) sender.close();
+ if (sender ) sender.close();
+ if (!sender ) sender.close();
+
+ Session session;
+ if (session.isValid()) session.close();
+ if (session.isNull() ) session.close();
+ if (session ) session.close();
+ if (!session ) session.close();
+}
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp new file mode 100644 index 0000000000..83cdfd3c55 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Message.cpp @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +using namespace qpid::types; + +Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {} +Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {} + +Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {} +Message::~Message() { delete impl; } + +Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; } + +void Message::setReplyTo(const Address& d) { impl->setReplyTo(d); } +const Address& Message::getReplyTo() const { return impl->getReplyTo(); } + +void Message::setSubject(const std::string& s) { impl->setSubject(s); } +const std::string& Message::getSubject() const { return impl->getSubject(); } + +void Message::setContentType(const std::string& s) { impl->setContentType(s); } +const std::string& Message::getContentType() const { return impl->getContentType(); } + +void Message::setMessageId(const std::string& id) { impl->messageId = id; } +const std::string& Message::getMessageId() const { return impl->messageId; } + +void Message::setUserId(const std::string& id) { impl->userId = id; } +const std::string& Message::getUserId() const { return impl->userId; } + +void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; } +const std::string& Message::getCorrelationId() const { return impl->correlationId; } + +uint8_t Message::getPriority() const { return impl->priority; } +void Message::setPriority(uint8_t priority) { impl->priority = priority; } + +void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); } +Duration Message::getTtl() const { return Duration(impl->ttl); } + +void Message::setDurable(bool durable) { impl->durable = durable; } +bool Message::getDurable() const { return impl->durable; } + +bool Message::getRedelivered() const { return impl->redelivered; } +void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; } + +const Variant::Map& Message::getProperties() const { return impl->getHeaders(); } +Variant::Map& Message::getProperties() { return impl->getHeaders(); } +void Message::setProperty(const std::string& k, const qpid::types::Variant& v) { impl->setHeader(k,v); } + +void Message::setContent(const std::string& c) { impl->setBytes(c); } +void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); } +std::string Message::getContent() const { return impl->getBytes(); } + +const char* Message::getContentPtr() const +{ + return impl->getBytes().data(); +} + +size_t Message::getContentSize() const +{ + return impl->getBytes().size(); +} + +EncodingException::EncodingException(const std::string& msg) : qpid::types::Exception(msg) {} + +const std::string BAD_ENCODING("Unsupported encoding: %1% (only %2% is supported at present)."); + +template <class C> struct MessageCodec +{ + static bool checkEncoding(const std::string& requested) + { + if (requested.size()) { + if (requested == C::contentType) return true; + else throw EncodingException((boost::format(BAD_ENCODING) % requested % C::contentType).str()); + } else { + return false; + } + } + + /* + * Currently only support a single encoding type for both list and + * map, based on AMQP 0-10, though wider support is anticipated in the + * future. This method simply checks that the desired encoding (if one + * is specified, either through the message-content or through an + * override) is indeed supported. + */ + static void checkEncoding(const Message& message, const std::string& requested) + { + checkEncoding(requested) || checkEncoding(message.getContentType()); + } + + static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding) + { + checkEncoding(message, encoding); + C::decode(message.getContent(), object); + } + + static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding) + { + checkEncoding(message, encoding); + std::string content; + C::encode(map, content); + message.setContentType(C::contentType); + message.setContent(content); + } +}; + +void decode(const Message& message, Variant::Map& map, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::MapCodec>::decode(message, map, encoding); +} +void decode(const Message& message, Variant::List& list, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::ListCodec>::decode(message, list, encoding); +} +void encode(const Variant::Map& map, Message& message, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::MapCodec>::encode(map, message, encoding); +} +void encode(const Variant::List& list, Message& message, const std::string& encoding) +{ + MessageCodec<qpid::amqp_0_10::ListCodec>::encode(list, message, encoding); +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp new file mode 100644 index 0000000000..0601800e46 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageImpl.h" +#include "qpid/messaging/Message.h" + +namespace qpid { +namespace messaging { + +namespace { +const std::string EMPTY_STRING = ""; +} + +using namespace qpid::types; + +MessageImpl::MessageImpl(const std::string& c) : + priority(0), + ttl(0), + durable(false), + redelivered(false), + bytes(c), + internalId(0) {} +MessageImpl::MessageImpl(const char* chars, size_t count) : + priority(0), + ttl(0), + durable (false), + redelivered(false), + bytes(chars, count), + internalId(0) {} + +void MessageImpl::setReplyTo(const Address& d) { replyTo = d; } +const Address& MessageImpl::getReplyTo() const { return replyTo; } + +void MessageImpl::setSubject(const std::string& s) { subject = s; } +const std::string& MessageImpl::getSubject() const { return subject; } + +void MessageImpl::setContentType(const std::string& s) { contentType = s; } +const std::string& MessageImpl::getContentType() const { return contentType; } + +const Variant::Map& MessageImpl::getHeaders() const { return headers; } +Variant::Map& MessageImpl::getHeaders() { return headers; } +void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) { headers[key] = val; } + +//should these methods be on MessageContent? +void MessageImpl::setBytes(const std::string& c) { bytes = c; } +void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); } +const std::string& MessageImpl::getBytes() const { return bytes; } +std::string& MessageImpl::getBytes() { return bytes; } + +void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; } +qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; } + +MessageImpl& MessageImplAccess::get(Message& msg) +{ + return *msg.impl; +} +const MessageImpl& MessageImplAccess::get(const Message& msg) +{ + return *msg.impl; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h new file mode 100644 index 0000000000..57df6b3fda --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -0,0 +1,90 @@ +#ifndef QPID_MESSAGING_MESSAGEIMPL_H +#define QPID_MESSAGING_MESSAGEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include "qpid/types/Variant.h" +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace messaging { + +struct MessageImpl +{ + Address replyTo; + std::string subject; + std::string contentType; + std::string messageId; + std::string userId; + std::string correlationId; + uint8_t priority; + uint64_t ttl; + bool durable; + bool redelivered; + qpid::types::Variant::Map headers; + + std::string bytes; + + qpid::framing::SequenceNumber internalId; + + MessageImpl(const std::string& c); + MessageImpl(const char* chars, size_t count); + + void setReplyTo(const Address& d); + const Address& getReplyTo() const; + + void setSubject(const std::string& s); + const std::string& getSubject() const; + + void setContentType(const std::string& s); + const std::string& getContentType() const; + + const qpid::types::Variant::Map& getHeaders() const; + qpid::types::Variant::Map& getHeaders(); + void setHeader(const std::string& key, const qpid::types::Variant& val); + + void setBytes(const std::string& bytes); + void setBytes(const char* chars, size_t count); + const std::string& getBytes() const; + std::string& getBytes(); + + void setInternalId(qpid::framing::SequenceNumber id); + qpid::framing::SequenceNumber getInternalId(); + +}; + +class Message; + +/** + * Provides access to the internal MessageImpl for a message which is + * useful when accessing any message state not exposed directly + * through the public API. + */ +struct MessageImplAccess +{ + static MessageImpl& get(Message&); + static const MessageImpl& get(const Message&); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_MESSAGEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/PrivateImplRef.h b/qpid/cpp/src/qpid/messaging/PrivateImplRef.h new file mode 100644 index 0000000000..e77c58d071 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/PrivateImplRef.h @@ -0,0 +1,94 @@ +#ifndef QPID_MESSAGING_PRIVATEIMPL_H +#define QPID_MESSAGING_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/messaging/ImportExport.h" +#include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" + +namespace qpid { +namespace messaging { + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> PrivateImplRef; + * class FooImpl; + * + * Foo : public Handle<FooImpl> { + * public: + * Foo(FooImpl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int fooDo(); // and other Foo functions... + * + * private: + * typedef FooImpl Impl; + * Impl* impl; + * friend class PrivateImplRef<Foo>; + * + * === Foo.cpp + * + * typedef PrivateImplRef<Foo> PI; + * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::fooDo() { return impl->fooDo(); } + * + */ +template <class T> class PrivateImplRef { + public: + typedef typename T::Impl Impl; + typedef boost::intrusive_ptr<Impl> intrusive_ptr; + + /** Get the implementation pointer from a handle */ + static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const intrusive_ptr& p) { + if (t.impl == p) return; + if (t.impl) boost::intrusive_ptr_release(t.impl); + t.impl = p.get(); + if (t.impl) boost::intrusive_ptr_add_ref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_PRIVATEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp new file mode 100644 index 0000000000..78e0c5daa3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/ReceiverImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace messaging { + +typedef PrivateImplRef<qpid::messaging::Receiver> PI; + +Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); } +Receiver::Receiver(const Receiver& s) : Handle<ReceiverImpl>() { PI::copy(*this, s); } +Receiver::~Receiver() { PI::dtor(*this); } +Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); } +bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); } +Message Receiver::get(Duration timeout) { return impl->get(timeout); } +bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); } +Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); } +void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } +uint32_t Receiver::getCapacity() { return impl->getCapacity(); } +uint32_t Receiver::getAvailable() { return impl->getAvailable(); } +uint32_t Receiver::getUnsettled() { return impl->getUnsettled(); } +void Receiver::close() { impl->close(); } +const std::string& Receiver::getName() const { return impl->getName(); } +Session Receiver::getSession() const { return impl->getSession(); } +bool Receiver::isClosed() const { return impl->isClosed(); } +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h new file mode 100644 index 0000000000..57059bfd28 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h @@ -0,0 +1,52 @@ +#ifndef QPID_MESSAGING_RECEIVERIMPL_H +#define QPID_MESSAGING_RECEIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" + +namespace qpid { +namespace messaging { + +class Message; +class MessageListener; +class Session; + +class ReceiverImpl : public virtual qpid::RefCounted +{ + public: + virtual ~ReceiverImpl() {} + virtual bool get(Message& message, Duration timeout) = 0; + virtual Message get(Duration timeout) = 0; + virtual bool fetch(Message& message, Duration timeout) = 0; + virtual Message fetch(Duration timeout) = 0; + virtual void setCapacity(uint32_t) = 0; + virtual uint32_t getCapacity() = 0; + virtual uint32_t getAvailable() = 0; + virtual uint32_t getUnsettled() = 0; + virtual void close() = 0; + virtual const std::string& getName() const = 0; + virtual Session getSession() const = 0; + virtual bool isClosed() const = 0; +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_RECEIVERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp new file mode 100644 index 0000000000..53dbb69777 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Sender.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/SenderImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace messaging { +typedef PrivateImplRef<qpid::messaging::Sender> PI; + +Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); } +Sender::Sender(const Sender& s) : qpid::messaging::Handle<SenderImpl>() { PI::copy(*this, s); } +Sender::~Sender() { PI::dtor(*this); } +Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); } +void Sender::send(const Message& message, bool sync) { impl->send(message, sync); } +void Sender::close() { impl->close(); } +void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); } +uint32_t Sender::getCapacity() { return impl->getCapacity(); } +uint32_t Sender::getUnsettled() { return impl->getUnsettled(); } +uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); } +const std::string& Sender::getName() const { return impl->getName(); } +Session Sender::getSession() const { return impl->getSession(); } + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h new file mode 100644 index 0000000000..a1ca02c72c --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h @@ -0,0 +1,47 @@ +#ifndef QPID_MESSAGING_SENDERIMPL_H +#define QPID_MESSAGING_SENDERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" + +namespace qpid { +namespace messaging { + +class Message; +class Session; + +class SenderImpl : public virtual qpid::RefCounted +{ + public: + virtual ~SenderImpl() {} + virtual void send(const Message& message, bool sync) = 0; + virtual void close() = 0; + virtual void setCapacity(uint32_t) = 0; + virtual uint32_t getCapacity() = 0; + virtual uint32_t getUnsettled() = 0; + virtual const std::string& getName() const = 0; + virtual Session getSession() const = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_SENDERIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp new file mode 100644 index 0000000000..496953a8e5 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/Session.cpp @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/PrivateImplRef.h" + +namespace qpid { +namespace messaging { + +typedef PrivateImplRef<qpid::messaging::Session> PI; + +Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); } +Session::Session(const Session& s) : Handle<SessionImpl>() { PI::copy(*this, s); } +Session::~Session() { PI::dtor(*this); } +Session& Session::operator=(const Session& s) { return PI::assign(*this, s); } +void Session::commit() { impl->commit(); } +void Session::rollback() { impl->rollback(); } +void Session::acknowledge(bool sync) { impl->acknowledge(sync); } +void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); } +void Session::reject(Message& m) { impl->reject(m); } +void Session::release(Message& m) { impl->release(m); } +void Session::close() { impl->close(); } + +Sender Session::createSender(const Address& address) +{ + return impl->createSender(address); +} +Receiver Session::createReceiver(const Address& address) +{ + return impl->createReceiver(address); +} + +Sender Session::createSender(const std::string& address) +{ + return impl->createSender(Address(address)); +} +Receiver Session::createReceiver(const std::string& address) +{ + return impl->createReceiver(Address(address)); +} + +void Session::sync(bool block) +{ + impl->sync(block); +} + +bool Session::nextReceiver(Receiver& receiver, Duration timeout) +{ + return impl->nextReceiver(receiver, timeout); +} + + +Receiver Session::nextReceiver(Duration timeout) +{ + return impl->nextReceiver(timeout); +} + +uint32_t Session::getReceivable() { return impl->getReceivable(); } +uint32_t Session::getUnsettledAcks() { return impl->getUnsettledAcks(); } + +Sender Session::getSender(const std::string& name) const +{ + return impl->getSender(name); +} +Receiver Session::getReceiver(const std::string& name) const +{ + return impl->getReceiver(name); +} + +Connection Session::getConnection() const +{ + return impl->getConnection(); +} + +void Session::checkError() { impl->checkError(); } +bool Session::hasError() +{ + try { + checkError(); + return false; + } catch (const std::exception&) { + return true; + } +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h new file mode 100644 index 0000000000..02a254e4f2 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h @@ -0,0 +1,63 @@ +#ifndef QPID_MESSAGING_SESSIONIMPL_H +#define QPID_MESSAGING_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/RefCounted.h" +#include <string> +#include "qpid/messaging/Duration.h" + +namespace qpid { +namespace messaging { + +class Address; +class Connection; +class Message; +class Sender; +class Receiver; + +class SessionImpl : public virtual qpid::RefCounted +{ + public: + virtual ~SessionImpl() {} + virtual void commit() = 0; + virtual void rollback() = 0; + virtual void acknowledge(bool sync) = 0; + virtual void acknowledge(Message&) = 0; + virtual void reject(Message&) = 0; + virtual void release(Message&) = 0; + virtual void close() = 0; + virtual void sync(bool block) = 0; + virtual Sender createSender(const Address& address) = 0; + virtual Receiver createReceiver(const Address& address) = 0; + virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0; + virtual Receiver nextReceiver(Duration timeout) = 0; + virtual uint32_t getReceivable() = 0; + virtual uint32_t getUnsettledAcks() = 0; + virtual Sender getSender(const std::string& name) const = 0; + virtual Receiver getReceiver(const std::string& name) const = 0; + virtual Connection getConnection() const = 0; + virtual void checkError() = 0; + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_SESSIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp new file mode 100644 index 0000000000..5d2683fffe --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/exceptions.h" + +namespace qpid { +namespace messaging { + +MessagingException::MessagingException(const std::string& msg) : qpid::types::Exception(msg) {} +MessagingException::~MessagingException() throw() {} + +InvalidOptionString::InvalidOptionString(const std::string& msg) : MessagingException(msg) {} +KeyError::KeyError(const std::string& msg) : MessagingException(msg) {} + + +LinkError::LinkError(const std::string& msg) : MessagingException(msg) {} + +AddressError::AddressError(const std::string& msg) : LinkError(msg) {} +ResolutionError::ResolutionError(const std::string& msg) : AddressError(msg) {} +MalformedAddress::MalformedAddress(const std::string& msg) : AddressError(msg) {} +AssertionFailed::AssertionFailed(const std::string& msg) : ResolutionError(msg) {} +NotFound::NotFound(const std::string& msg) : ResolutionError(msg) {} + +ReceiverError::ReceiverError(const std::string& msg) : LinkError(msg) {} +FetchError::FetchError(const std::string& msg) : ReceiverError(msg) {} +NoMessageAvailable::NoMessageAvailable() : FetchError("No message to fetch") {} + +SenderError::SenderError(const std::string& msg) : LinkError(msg) {} +SendError::SendError(const std::string& msg) : SenderError(msg) {} +TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {} + +SessionError::SessionError(const std::string& msg) : MessagingException(msg) {} +TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {} +TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {} +UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {} + +ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {} + +TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {} + +}} // namespace qpid::messaging |