diff options
author | Gordon Sim <gsim@apache.org> | 2010-01-28 08:37:37 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-01-28 08:37:37 +0000 |
commit | e74eaed0bc3665bc38d7cbedce85f85536f92824 (patch) | |
tree | f55ff48efa8e2b43c3bf3e0f1d4003b4d7fbe88f /cpp | |
parent | a5318490afdca4c9a16329f2a0e2f9ded0813f36 (diff) | |
download | qpid-python-e74eaed0bc3665bc38d7cbedce85f85536f92824.tar.gz |
QPID-664: change format of connection options string to match address options; make open() a non-static method.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
24 files changed, 410 insertions, 255 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp index 49d5441bf0..4d68d7c575 100644 --- a/cpp/examples/messaging/client.cpp +++ b/cpp/examples/messaging/client.cpp @@ -40,7 +40,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("service_queue"); diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp index 3834aa3dfd..21c7df7388 100644 --- a/cpp/examples/messaging/drain.cpp +++ b/cpp/examples/messaging/drain.cpp @@ -42,6 +42,7 @@ struct Options : public qpid::Options bool help; std::string url; std::string address; + std::string connectionOptions; int64_t timeout; bool forever; qpid::log::Options log; @@ -59,6 +60,7 @@ struct Options : public qpid::Options ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") + ("connection-options", qpid::optValue(connectionOptions,"OPTIONS"), "connection options string in the form {name1=value1, name2=value2}") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -96,7 +98,8 @@ int main(int argc, char** argv) Options options(argv[0]); if (options.parse(argc, argv)) { try { - Connection connection = Connection::open(options.url); + Connection connection(options.connectionOptions); + connection.open(options.url); Session session = connection.newSession(); Receiver receiver = session.createReceiver(options.address); Duration timeout = options.getTimeout(); diff --git a/cpp/examples/messaging/map_receiver.cpp b/cpp/examples/messaging/map_receiver.cpp index 0106ce878c..05be4090d2 100644 --- a/cpp/examples/messaging/map_receiver.cpp +++ b/cpp/examples/messaging/map_receiver.cpp @@ -39,7 +39,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("message_queue"); Message message = receiver.fetch(); diff --git a/cpp/examples/messaging/map_sender.cpp b/cpp/examples/messaging/map_sender.cpp index 50150fa3d5..b6e0621844 100644 --- a/cpp/examples/messaging/map_sender.cpp +++ b/cpp/examples/messaging/map_sender.cpp @@ -39,7 +39,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("message_queue"); diff --git a/cpp/examples/messaging/queue_receiver.cpp b/cpp/examples/messaging/queue_receiver.cpp index 058a4e19ae..192b90088d 100644 --- a/cpp/examples/messaging/queue_receiver.cpp +++ b/cpp/examples/messaging/queue_receiver.cpp @@ -32,9 +32,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; try { - Variant::Map options; - if (argc>2) parseOptionString(argv[2], options); - Connection connection = Connection::open(url, options); + Connection connection; + connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("message_queue"); while (true) { diff --git a/cpp/examples/messaging/queue_sender.cpp b/cpp/examples/messaging/queue_sender.cpp index 1396e26d5c..b2535d90bf 100644 --- a/cpp/examples/messaging/queue_sender.cpp +++ b/cpp/examples/messaging/queue_sender.cpp @@ -35,7 +35,8 @@ int main(int argc, char** argv) { int count = argc>2 ? atoi(argv[2]) : 10; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("message_queue"); diff --git a/cpp/examples/messaging/server.cpp b/cpp/examples/messaging/server.cpp index 137323dd03..0a80a5fb02 100644 --- a/cpp/examples/messaging/server.cpp +++ b/cpp/examples/messaging/server.cpp @@ -42,7 +42,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("service_queue; {create: always}"); diff --git a/cpp/examples/messaging/spout.cpp b/cpp/examples/messaging/spout.cpp index 62d72a8043..cbb6b52b34 100644 --- a/cpp/examples/messaging/spout.cpp +++ b/cpp/examples/messaging/spout.cpp @@ -56,6 +56,7 @@ struct Options : public qpid::Options string_vector properties; string_vector entries; std::string content; + std::string connectionOptions; qpid::log::Options log; Options(const std::string& argv0=std::string()) @@ -76,6 +77,7 @@ struct Options : public qpid::Options ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") ("content", qpid::optValue(content, "CONTENT"), "specify textual content") + ("connection-options", qpid::optValue(connectionOptions,"OPTIONS"), "connection options string in the form {name1=value1, name2=value2}") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -155,7 +157,8 @@ int main(int argc, char** argv) Options options(argv[0]); if (options.parse(argc, argv)) { try { - Connection connection = Connection::open(options.url); + Connection connection(options.connectionOptions); + connection.open(options.url); Session session = connection.newSession(); Sender sender = session.createSender(options.address); diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp index 321231f93d..13f881e574 100644 --- a/cpp/examples/messaging/topic_receiver.cpp +++ b/cpp/examples/messaging/topic_receiver.cpp @@ -35,7 +35,8 @@ int main(int argc, char** argv) { const std::string pattern = argc>2 ? argv[2] : "#.#"; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("news_service; {filter:[control, " + pattern + "]}"); while (true) { diff --git a/cpp/examples/messaging/topic_sender.cpp b/cpp/examples/messaging/topic_sender.cpp index 5665fc45f9..d1ada45864 100644 --- a/cpp/examples/messaging/topic_sender.cpp +++ b/cpp/examples/messaging/topic_sender.cpp @@ -52,7 +52,8 @@ int main(int argc, char** argv) { int count = argc>2 ? atoi(argv[2]) : 10; try { - Connection connection = Connection::open(url); + Connection connection; + connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("news_service"); diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h index 5c5246ff82..36392da0b2 100644 --- a/cpp/include/qpid/messaging/Connection.h +++ b/cpp/include/qpid/messaging/Connection.h @@ -38,9 +38,16 @@ namespace messaging { class ConnectionImpl; class Session; +struct InvalidOptionString : public qpid::Exception +{ + InvalidOptionString(const std::string& msg); +}; + class Connection : public qpid::client::Handle<ConnectionImpl> { public: + QPID_CLIENT_EXTERN Connection(ConnectionImpl* impl); + QPID_CLIENT_EXTERN Connection(const Connection&); /** * Current implementation supports the following options: * @@ -70,12 +77,11 @@ class Connection : public qpid::client::Handle<ConnectionImpl> * * */ - static QPID_CLIENT_EXTERN Connection open(const std::string& url, const Variant::Map& options = Variant::Map()); - - QPID_CLIENT_EXTERN Connection(ConnectionImpl* impl = 0); - QPID_CLIENT_EXTERN Connection(const Connection&); + QPID_CLIENT_EXTERN Connection(const Variant::Map& options = Variant::Map()); + QPID_CLIENT_EXTERN Connection(const std::string& options); QPID_CLIENT_EXTERN ~Connection(); QPID_CLIENT_EXTERN Connection& operator=(const Connection&); + QPID_CLIENT_EXTERN void open(const std::string& url); QPID_CLIENT_EXTERN void close(); QPID_CLIENT_EXTERN Session newSession(bool transactional, const std::string& name = std::string()); QPID_CLIENT_EXTERN Session newSession(const std::string& name = std::string()); @@ -87,19 +93,6 @@ class Connection : public qpid::client::Handle<ConnectionImpl> }; -struct InvalidOptionString : public qpid::Exception -{ - InvalidOptionString(const std::string& msg); -}; - -/** - * TODO: need to change format of connection option string (currently - * name1=value1&name2=value2 etc, should probably use map syntax as - * per address options. - */ -QPID_CLIENT_EXTERN void parseOptionString(const std::string&, Variant::Map&); -QPID_CLIENT_EXTERN Variant::Map parseOptionString(const std::string&); - }} // namespace qpid::messaging #endif /*!QPID_MESSAGING_CONNECTION_H*/ diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 1727de6eb0..6570740872 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -639,6 +639,8 @@ set (qpidclient_SOURCES qpid/client/SubscriptionManagerImpl.cpp qpid/client/TCPConnector.cpp qpid/messaging/Address.cpp + qpid/messaging/AddressParser.h + qpid/messaging/AddressParser.cpp qpid/messaging/Connection.cpp qpid/messaging/ConnectionImpl.h qpid/messaging/ListContent.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 0e1b330245..58214d42c6 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -702,6 +702,8 @@ libqpidclient_la_SOURCES = \ qpid/client/TCPConnector.cpp \ qpid/client/TCPConnector.h \ qpid/messaging/Address.cpp \ + qpid/messaging/AddressParser.h \ + qpid/messaging/AddressParser.cpp \ qpid/messaging/Connection.cpp \ qpid/messaging/ListContent.cpp \ qpid/messaging/ListView.cpp \ diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index cd5c0214e3..4242850192 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -61,8 +61,8 @@ void convert(const Variant::Map& from, ConnectionSettings& to) setIfFound(from, "bounds", to.bounds); } -ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) : - url(u), reconnectionEnabled(true), timeout(-1), +ConnectionImpl::ConnectionImpl(const Variant::Map& options) : + reconnectionEnabled(true), timeout(-1), minRetryInterval(1), maxRetryInterval(30) { QPID_LOG(debug, "Opening connection to " << url << " with " << options); @@ -71,6 +71,11 @@ ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options setIfFound(options, "reconnection-timeout", timeout); setIfFound(options, "min-retry-interval", minRetryInterval); setIfFound(options, "max-retry-interval", maxRetryInterval); +} + +void ConnectionImpl::open(const std::string& u) +{ + url = u; connection.open(url, settings); } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 979cc6c82a..d9d0d1e065 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -40,7 +40,8 @@ class SessionImpl; class ConnectionImpl : public qpid::messaging::ConnectionImpl { public: - ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options); + ConnectionImpl(const qpid::messaging::Variant::Map& options); + void open(const std::string& url); void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); qpid::messaging::Session getSession(const std::string& name) const; diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp index ff72f62705..057196a957 100644 --- a/cpp/src/qpid/messaging/Address.cpp +++ b/cpp/src/qpid/messaging/Address.cpp @@ -156,207 +156,4 @@ InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {} MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {} -AddressParser::AddressParser(const std::string& s) : input(s), current(0) {} - -bool AddressParser::error(const std::string& message) -{ - throw MalformedAddress((boost::format("%1%, character %2% of %3%") % message % current % input).str()); -} - -bool AddressParser::parse(Address& address) -{ - std::string name; - if (readName(name)) { - if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name; - address.setName(name); - if (readChar('/')) { - std::string subject; - if (readSubject(subject)) { - address.setSubject(subject); - } else { - return error("Expected subject after /"); - } - } - if (readChar(';')) { - Variant options = Variant::Map(); - if (readMap(options)) { - address.setOptions(options.asMap()); - } - } - //skip trailing whitespace - while (!eos() && iswhitespace()) ++current; - return eos() || error("Unexpected chars in address: " + input.substr(current)); - } else { - return input.empty() || error("Expected name"); - } -} - -bool AddressParser::readList(Variant& value) -{ - if (readChar('[')) { - value = Variant::List(); - Variant item; - while (readValue(item)) { - value.asList().push_back(item); - if (!readChar(',')) break; - } - return readChar(']') || error("Unmatched '['!"); - } else { - return false; - } -} - -bool AddressParser::readMap(Variant& value) -{ - if (readChar('{')) { - value = Variant::Map(); - while (readKeyValuePair(value.asMap()) && readChar(',')) {} - return readChar('}') || error("Unmatched '{'!"); - } else { - return false; - } -} - -bool AddressParser::readKeyValuePair(Variant::Map& map) -{ - std::string key; - Variant value; - if (readKey(key)) { - if (readChar(':') && readValue(value)) { - map[key] = value; - return true; - } else { - return error("Bad key-value pair, expected ':'"); - } - } else { - return false; - } -} - -bool AddressParser::readKey(std::string& key) -{ - return readWord(key); -} - -bool AddressParser::readValue(Variant& value) -{ - return readSimpleValue(value) || readQuotedValue(value) || - readMap(value) || readList(value) || error("Expected value"); -} - -bool AddressParser::readString(std::string& value, char delimiter) -{ - if (readChar(delimiter)) { - std::string::size_type start = current++; - while (!eos()) { - if (input.at(current) == delimiter) { - if (current > start) { - value = input.substr(start, current - start); - } else { - value = ""; - } - ++current; - return true; - } else { - ++current; - } - } - return error("Unmatched delimiter"); - } else { - return false; - } -} - -bool AddressParser::readName(std::string& name) -{ - return readQuotedString(name) || readWord(name, "/;"); -} - -bool AddressParser::readSubject(std::string& subject) -{ - return readQuotedString(subject) || readWord(subject, ";"); -} - -bool AddressParser::readQuotedString(std::string& s) -{ - return readString(s, '"') || readString(s, '\''); -} - -bool AddressParser::readQuotedValue(Variant& value) -{ - std::string s; - if (readQuotedString(s)) { - value = s; - return true; - } else { - return false; - } -} - -bool AddressParser::readSimpleValue(Variant& value) -{ - std::string s; - if (readWord(s)) { - value = s; - try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {} - try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {} - return true; - } else { - return false; - } -} - -bool AddressParser::readWord(std::string& value, const std::string& delims) -{ - //skip leading whitespace - while (!eos() && iswhitespace()) ++current; - - //read any number of non-whitespace, non-reserved chars into value - std::string::size_type start = current; - while (!eos() && !iswhitespace() && !in(delims)) ++current; - - if (current > start) { - value = input.substr(start, current - start); - return true; - } else { - return false; - } -} - -bool AddressParser::readChar(char c) -{ - while (!eos()) { - if (iswhitespace()) { - ++current; - } else if (input.at(current) == c) { - ++current; - return true; - } else { - return false; - } - } - return false; -} - -bool AddressParser::iswhitespace() -{ - return ::isspace(input.at(current)); -} - -bool AddressParser::isreserved() -{ - return in(RESERVED); -} - -bool AddressParser::in(const std::string& chars) -{ - return chars.find(input.at(current)) != std::string::npos; -} - -bool AddressParser::eos() -{ - return current >= input.size(); -} - -const std::string AddressParser::RESERVED = "\'\"{}[],:/"; }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/AddressParser.cpp b/cpp/src/qpid/messaging/AddressParser.cpp new file mode 100644 index 0000000000..265b5fe195 --- /dev/null +++ b/cpp/src/qpid/messaging/AddressParser.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AddressParser.h" +#include "qpid/framing/Uuid.h" +#include <boost/format.hpp> + +namespace qpid { +namespace messaging { + +AddressParser::AddressParser(const std::string& s) : input(s), current(0) {} + +bool AddressParser::error(const std::string& message) +{ + throw MalformedAddress((boost::format("%1%, character %2% of %3%") % message % current % input).str()); +} + +bool AddressParser::parse(Address& address) +{ + std::string name; + if (readName(name)) { + if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name; + address.setName(name); + if (readChar('/')) { + std::string subject; + if (readSubject(subject)) { + address.setSubject(subject); + } else { + return error("Expected subject after /"); + } + } + if (readChar(';')) { + Variant options = Variant::Map(); + if (readMap(options)) { + address.setOptions(options.asMap()); + } + } + //skip trailing whitespace + while (!eos() && iswhitespace()) ++current; + return eos() || error("Unexpected chars in address: " + input.substr(current)); + } else { + return input.empty() || error("Expected name"); + } +} + +bool AddressParser::parseMap(Variant::Map& map) +{ + if (readChar('{')) { + readMapEntries(map); + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +bool AddressParser::parseList(Variant::List& list) +{ + if (readChar('[')) { + readListItems(list); + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + + +bool AddressParser::readList(Variant& value) +{ + if (readChar('[')) { + value = Variant::List(); + readListItems(value.asList()); + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + +void AddressParser::readListItems(Variant::List& list) +{ + Variant item; + while (readValue(item)) { + list.push_back(item); + if (!readChar(',')) break; + } +} + +bool AddressParser::readMap(Variant& value) +{ + if (readChar('{')) { + value = Variant::Map(); + readMapEntries(value.asMap()); + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +void AddressParser::readMapEntries(Variant::Map& map) +{ + while (readKeyValuePair(map) && readChar(',')) {} +} + +bool AddressParser::readKeyValuePair(Variant::Map& map) +{ + std::string key; + Variant value; + if (readKey(key)) { + if (readChar(':') && readValue(value)) { + map[key] = value; + return true; + } else { + return error("Bad key-value pair, expected ':'"); + } + } else { + return false; + } +} + +bool AddressParser::readKey(std::string& key) +{ + return readWord(key); +} + +bool AddressParser::readValue(Variant& value) +{ + return readSimpleValue(value) || readQuotedValue(value) || + readMap(value) || readList(value) || error("Expected value"); +} + +bool AddressParser::readString(std::string& value, char delimiter) +{ + if (readChar(delimiter)) { + std::string::size_type start = current++; + while (!eos()) { + if (input.at(current) == delimiter) { + if (current > start) { + value = input.substr(start, current - start); + } else { + value = ""; + } + ++current; + return true; + } else { + ++current; + } + } + return error("Unmatched delimiter"); + } else { + return false; + } +} + +bool AddressParser::readName(std::string& name) +{ + return readQuotedString(name) || readWord(name, "/;"); +} + +bool AddressParser::readSubject(std::string& subject) +{ + return readQuotedString(subject) || readWord(subject, ";"); +} + +bool AddressParser::readQuotedString(std::string& s) +{ + return readString(s, '"') || readString(s, '\''); +} + +bool AddressParser::readQuotedValue(Variant& value) +{ + std::string s; + if (readQuotedString(s)) { + value = s; + return true; + } else { + return false; + } +} + +bool AddressParser::readSimpleValue(Variant& value) +{ + std::string s; + if (readWord(s)) { + value = s; + try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {} + try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {} + return true; + } else { + return false; + } +} + +bool AddressParser::readWord(std::string& value, const std::string& delims) +{ + //skip leading whitespace + while (!eos() && iswhitespace()) ++current; + + //read any number of non-whitespace, non-reserved chars into value + std::string::size_type start = current; + while (!eos() && !iswhitespace() && !in(delims)) ++current; + + if (current > start) { + value = input.substr(start, current - start); + return true; + } else { + return false; + } +} + +bool AddressParser::readChar(char c) +{ + while (!eos()) { + if (iswhitespace()) { + ++current; + } else if (input.at(current) == c) { + ++current; + return true; + } else { + return false; + } + } + return false; +} + +bool AddressParser::iswhitespace() +{ + return ::isspace(input.at(current)); +} + +bool AddressParser::isreserved() +{ + return in(RESERVED); +} + +bool AddressParser::in(const std::string& chars) +{ + return chars.find(input.at(current)) != std::string::npos; +} + +bool AddressParser::eos() +{ + return current >= input.size(); +} + +const std::string AddressParser::RESERVED = "\'\"{}[],:/"; + +}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/AddressParser.h b/cpp/src/qpid/messaging/AddressParser.h new file mode 100644 index 0000000000..801b5cead1 --- /dev/null +++ b/cpp/src/qpid/messaging/AddressParser.h @@ -0,0 +1,65 @@ +#ifndef QPID_MESSAGING_ADDRESSPARSER_H +#define QPID_MESSAGING_ADDRESSPARSER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" + +namespace qpid { +namespace messaging { + +class AddressParser +{ + public: + AddressParser(const std::string&); + bool parse(Address& address); + bool parseMap(Variant::Map& map); + bool parseList(Variant::List& list); + private: + const std::string& input; + std::string::size_type current; + static const std::string RESERVED; + + bool readChar(char c); + bool readQuotedString(std::string& s); + bool readQuotedValue(Variant& value); + bool readString(std::string& value, char delimiter); + bool readWord(std::string& word, const std::string& delims = RESERVED); + bool readSimpleValue(Variant& word); + bool readKey(std::string& key); + bool readValue(Variant& value); + bool readKeyValuePair(Variant::Map& map); + bool readMap(Variant& value); + bool readList(Variant& value); + bool readName(std::string& name); + bool readSubject(std::string& subject); + bool error(const std::string& message); + bool eos(); + bool iswhitespace(); + bool in(const std::string& delims); + bool isreserved(); + void readListItems(Variant::List& list); + void readMapEntries(Variant::Map& map); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_ADDRESSPARSER_H*/ diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 64ca962317..06006c9d20 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/messaging/Connection.h" +#include "qpid/messaging/AddressParser.h" #include "qpid/messaging/ConnectionImpl.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/SessionImpl.h" @@ -37,18 +38,27 @@ namespace messaging { using qpid::client::PI; -Connection Connection::open(const std::string& url, const Variant::Map& options) -{ - //only support amqp 0-10 at present - Connection connection(new qpid::client::amqp0_10::ConnectionImpl(url, options)); - return connection; -} - Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); } Connection::Connection(const Connection& c) : qpid::client::Handle<ConnectionImpl>() { PI::copy(*this, c); } Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); } Connection::~Connection() { PI::dtor(*this); } +Connection::Connection(const std::string& o) +{ + Variant::Map options; + AddressParser parser(o); + if (parser.parseMap(options)) { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options)); + } else { + throw InvalidOptionString(o); + } +} +Connection::Connection(const Variant::Map& options) +{ + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(options)); +} + +void Connection::open(const std::string& url) { impl->open(url); } void Connection::close() { impl->close(); } Session Connection::newSession(const char* name) { return impl->newSession(false, name); } Session Connection::newSession(const std::string& name) { return impl->newSession(false, name); } diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h index 4eff68ff9d..589c9fbe57 100644 --- a/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/cpp/src/qpid/messaging/ConnectionImpl.h @@ -36,6 +36,7 @@ class ConnectionImpl : public virtual qpid::RefCounted { public: virtual ~ConnectionImpl() {} + virtual void open(const std::string& url) = 0; virtual void close() = 0; virtual Session newSession(bool transactional, const std::string& name) = 0; virtual Session getSession(const std::string& name) const = 0; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 91dad8f3f2..9c6f066d64 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -110,9 +110,18 @@ struct MessagingFixture : public BrokerFixture MessagingFixture(Broker::Options opts = Broker::Options()) : BrokerFixture(opts), - connection(Connection::open((boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(Broker::TCP_TRANSPORT))).str())), + connection(open(broker->getPort(Broker::TCP_TRANSPORT))), session(connection.newSession()), - admin(broker->getPort(Broker::TCP_TRANSPORT)) {} + admin(broker->getPort(Broker::TCP_TRANSPORT)) + { + } + + static Connection open(uint16_t port) + { + Connection connection; + connection.open((boost::format("amqp:tcp:localhost:%1%") % (port)).str()); + return connection; + } void ping(const qpid::messaging::Address& address) { diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index c45d76f91f..87a360ec0c 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -148,11 +148,8 @@ int main(int argc, char ** argv) Options opts; if (opts.parse(argc, argv)) { try { - Variant::Map connectionOptions; - if (opts.connectionOptions.size()) { - parseOptionString(opts.connectionOptions, connectionOptions); - } - Connection connection = Connection::open(opts.url, connectionOptions); + Connection connection(opts.connectionOptions); + connection.open(opts.url); Session session = connection.newSession(opts.tx > 0); Receiver receiver = session.createReceiver(opts.address); receiver.setCapacity(opts.capacity); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index c58d5fa10b..9556fb000f 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -180,11 +180,8 @@ int main(int argc, char ** argv) Options opts; if (opts.parse(argc, argv)) { try { - Variant::Map connectionOptions; - if (opts.connectionOptions.size()) { - parseOptionString(opts.connectionOptions, connectionOptions); - } - Connection connection = Connection::open(opts.url, connectionOptions); + Connection connection(opts.connectionOptions); + connection.open(opts.url); Session session = connection.newSession(opts.tx > 0); Sender sender = session.createSender(opts.address); Message msg; diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index 8195bf390e..3cc8e70809 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -72,7 +72,8 @@ struct Client : Runnable void run() { try { - Connection connection = Connection::open(opts.url); + Connection connection; + connection.open(opts.url); Session session = connection.newSession(); doWork(session); session.close(); |