diff options
Diffstat (limited to 'cpp/src/qpid/client')
23 files changed, 293 insertions, 321 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 8dc1e8338a..ab0d8e0700 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,6 +22,7 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/SaslFactory.h" +#include "qpid/StringUtils.h" #include "qpid/client/Bounds.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" @@ -142,7 +143,9 @@ void ConnectionHandler::outgoing(AMQFrame& frame) void ConnectionHandler::waitForOpen() { waitFor(ESTABLISHED); - if (getState() == FAILED || getState() == CLOSED) { + if (getState() == FAILED) { + throw TransportFailure(errorText); + } else if (getState() == CLOSED) { throw ConnectionException(errorCode, errorText); } } @@ -202,6 +205,24 @@ void ConnectionHandler::fail(const std::string& message) namespace { std::string SPACE(" "); + +std::string join(const std::vector<std::string>& in) +{ + std::string result; + for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { + if (result.size()) result += SPACE; + result += *i; + } + return result; +} + +void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results) +{ + for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) { + if (std::find(b.begin(), b.end(), *i) != b.end()) results.push_back(*i); + } +} + } void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/) @@ -216,26 +237,35 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me maxSsf ); - std::string mechlist; - bool chosenMechanismSupported = mechanism.empty(); - for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) { - if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) { - chosenMechanismSupported = true; - mechlist = (*i)->get<std::string>() + SPACE + mechlist; - } else { - if (i != mechanisms.begin()) mechlist += SPACE; - mechlist += (*i)->get<std::string>(); + std::vector<std::string> mechlist; + if (mechanism.empty()) { + //mechlist is simply what the server offers + mechanisms.collect(mechlist); + } else { + //mechlist is the intersection of those indicated by user and + //those supported by server, in the order listed by user + std::vector<std::string> allowed = split(mechanism, " "); + std::vector<std::string> supported; + mechanisms.collect(supported); + intersection(allowed, supported, mechlist); + if (mechlist.empty()) { + throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")")); } } - if (!chosenMechanismSupported) { - fail("Selected mechanism not supported: " + mechanism); - } - if (sasl.get()) { - string response = sasl->start(mechanism.empty() ? mechlist : mechanism, - getSecuritySettings ? getSecuritySettings() : 0); - proxy.startOk(properties, sasl->getMechanism(), response, locale); + string response; + if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) { + proxy.startOk(properties, sasl->getMechanism(), response, locale); + } else { + //response was null + ConnectionStartOkBody body; + body.setClientProperties(properties); + body.setMechanism(sasl->getMechanism()); + //Don't set response, as none was given + body.setLocale(locale); + proxy.send(body); + } } else { //TODO: verify that desired mechanism and locale are supported string response = ((char)0) + username + ((char)0) + password; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 40c004f166..db97f1e0f4 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -36,6 +36,7 @@ #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> #include <boost/shared_ptr.hpp> #include <limits> @@ -258,16 +259,16 @@ void ConnectionImpl::open() connector->setInputHandler(&handler); connector->setShutdownHandler(this); try { - connector->connect(host, port); - + std::string p = boost::lexical_cast<std::string>(port); + connector->connect(host, p); + } catch (const std::exception& e) { QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); connector.reset(); - throw; + throw TransportFailure(e.what()); } connector->init(); - QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); - + // Enable heartbeat if requested uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat; if (heartbeat) { @@ -281,6 +282,7 @@ void ConnectionImpl::open() // - in that case in connector.reset() above; // - or when we are deleted handler.waitForOpen(); + QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); // If the SASL layer has provided an "operational" userId for the connection, // put it in the negotiated settings. diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 586012f9d6..bc611ffe0d 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -61,7 +61,7 @@ class Connector : public framing::OutputHandler static void registerFactory(const std::string& proto, Factory* connectorFactory); virtual ~Connector() {}; - virtual void connect(const std::string& host, int port) = 0; + virtual void connect(const std::string& host, const std::string& port) = 0; virtual void init() {}; virtual void close() = 0; virtual void send(framing::AMQFrame& frame) = 0; diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 6af607198c..664640f5e7 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -95,7 +95,7 @@ class RdmaConnector : public Connector, public sys::Codec std::string identifier; - void connect(const std::string& host, int port); + void connect(const std::string& host, const std::string& port); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: need to fix this for heartbeat timeouts to work @@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() { } } -void RdmaConnector::connect(const std::string& host, int port){ +void RdmaConnector::connect(const std::string& host, const std::string& port){ Mutex::ScopedLock l(dataConnectedLock); assert(!dataConnected); @@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::string& host, int port){ boost::bind(&RdmaConnector::disconnected, this), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); acon->start(poller, sa); } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index b507625b11..7cf4ef648e 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -170,6 +170,7 @@ Demux& SessionImpl::getDemux() void SessionImpl::waitForCompletion(const SequenceNumber& id) { Lock l(state); + sys::Waitable::ScopedWait w(state); waitForCompletionImpl(id); } diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 35c7e6bdf6..26c2335eda 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -114,7 +114,7 @@ class SslConnector : public Connector std::string identifier; - void connect(const std::string& host, int port); + void connect(const std::string& host, const std::string& port); void init(); void close(); void send(framing::AMQFrame& frame); @@ -190,14 +190,14 @@ SslConnector::~SslConnector() { close(); } -void SslConnector::connect(const std::string& host, int port){ +void SslConnector::connect(const std::string& host, const std::string& port){ Mutex::ScopedLock l(closedLock); assert(closed); try { socket.connect(host, port); } catch (const std::exception& e) { socket.close(); - throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what()); + throw TransportFailure(e.what()); } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index e284d57bec..0070b24ec0 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() { close(); } -void TCPConnector::connect(const std::string& host, int port) { +void TCPConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( @@ -117,11 +117,11 @@ void TCPConnector::connected(const Socket&) { void TCPConnector::start(sys::AsynchIO* aio_) { aio = aio_; - for (int i = 0; i < 32; i++) { + for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + identifier = str(format("[%1%]") % socket.getFullAddress()); } void TCPConnector::initAmqp() { diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index c756469182..eb3f696013 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -98,7 +98,7 @@ class TCPConnector : public Connector, public sys::Codec protected: virtual ~TCPConnector(); - void connect(const std::string& host, int port); + void connect(const std::string& host, const std::string& port); void start(sys::AsynchIO* aio_); void initAmqp(); virtual void connectFailed(const std::string& msg); diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index bfb20118b5..d2accddcd0 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,12 +30,23 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) +SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); + SequenceSet accepting; + if (cumulative) { + for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { + accepting.add(*i); + } + unconfirmed.add(accepting); + unaccepted.remove(accepting); + } else { + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); + accepting.add(id); + } } + return accepting; } void AcceptTracker::State::release() @@ -59,6 +70,18 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin destinationState[destination].unaccepted.add(id); } +namespace +{ +const size_t FLUSH_FREQUENCY = 1024; +} + +void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) +{ + pending.push_back(record); + if (pending.size() % FLUSH_FREQUENCY == 0) session.flush(); +} + + void AcceptTracker::accept(qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { @@ -67,20 +90,19 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) Record record; record.status = session.messageAccept(aggregateState.unaccepted); record.accepted = aggregateState.unaccepted; - pending.push_back(record); + addToPending(session, record); aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id); + i->second.accept(id, cumulative); } Record record; - record.accepted.add(id); + record.accepted = aggregateState.accept(id, cumulative); record.status = session.messageAccept(record.accepted); - pending.push_back(record); - aggregateState.accept(id); + addToPending(session, record); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 87890e41cc..85209c3b87 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -42,7 +42,7 @@ class AcceptTracker public: void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); void accept(qpid::client::AsyncSession&); - void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative); void release(qpid::client::AsyncSession&); uint32_t acceptsPending(); uint32_t acceptsPending(const std::string& destination); @@ -62,7 +62,7 @@ class AcceptTracker qpid::framing::SequenceSet unconfirmed; void accept(); - void accept(qpid::framing::SequenceNumber); + qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative); void release(); uint32_t acceptsPending(); void completed(qpid::framing::SequenceSet&); @@ -79,6 +79,7 @@ class AcceptTracker StateMap destinationState; Records pending; + void addToPending(qpid::client::AsyncSession&, const Record&); void checkPending(); void completed(qpid::framing::SequenceSet&); }; diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f1295a3b66..16e5fde075 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -129,6 +129,10 @@ const std::string HEADERS_EXCHANGE("headers"); const std::string XML_EXCHANGE("xml"); const std::string WILDCARD_ANY("#"); +//exchange prefixes: +const std::string PREFIX_AMQ("amq."); +const std::string PREFIX_QPID("qpid."); + const Verifier verifier; } @@ -199,6 +203,7 @@ class Exchange : protected Node void checkCreate(qpid::client::AsyncSession&, CheckMode); void checkAssert(qpid::client::AsyncSession&, CheckMode); void checkDelete(qpid::client::AsyncSession&, CheckMode); + bool isReservedName(); protected: const std::string specifiedType; @@ -233,6 +238,8 @@ class Subscription : public Exchange, public MessageSource const bool reliable; const bool durable; const std::string actualType; + const bool exclusiveQueue; + const bool exclusiveSubscription; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; @@ -307,6 +314,7 @@ struct Opt Opt& operator/(const std::string& name); operator bool() const; std::string str() const; + bool asBool(bool defaultValue) const; const Variant::List& asList() const; void collect(qpid::framing::FieldTable& args) const; @@ -338,6 +346,12 @@ Opt::operator bool() const return value && !value->isVoid() && value->asBool(); } +bool Opt::asBool(bool defaultValue) const +{ + if (value) return value->asBool(); + else return defaultValue; +} + std::string Opt::str() const { if (value) return value->asString(); @@ -481,7 +495,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const std if (name.empty()) { return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); } else { - return (boost::format("%1%_%2%") % base % name).str(); + return name; } } @@ -490,7 +504,9 @@ Subscription::Subscription(const Address& address, const std::string& type) queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), reliable(AddressResolution::is_reliable(address)), durable(Opt(address)/LINK/DURABLE), - actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type) + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), + exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), + exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -550,7 +566,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str checkAssert(session, FOR_RECEIVER); //create subscription queue: - session.queueDeclare(arg::queue=queue, arg::exclusive=true, + session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); //'default' binding: bindings.bind(session); @@ -559,15 +575,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str linkBindings.bind(session); //subscribe to subscription queue: AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; - session.messageSubscribe(arg::queue=queue, arg::destination=destination, - arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) { linkBindings.unbind(session); session.messageCancel(destination); - session.queueDelete(arg::queue=queue); + if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true); checkDelete(session, FOR_RECEIVER); } @@ -761,18 +777,32 @@ Exchange::Exchange(const Address& a) : Node(a), linkBindings.setDefaultExchange(name); } +bool Exchange::isReservedName() +{ + return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos; +} + void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(createPolicy, mode)) { try { - std::string type = specifiedType; - if (type.empty()) type = TOPIC_EXCHANGE; - session.exchangeDeclare(arg::exchange=name, - arg::type=type, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); + if (isReservedName()) { + try { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } catch (const qpid::framing::NotFoundException& /*e*/) { + throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str()); + } + + } else { + std::string type = specifiedType; + if (type.empty()) type = TOPIC_EXCHANGE; + session.exchangeDeclare(arg::exchange=name, + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + } nodeBindings.bind(session); session.sync(); } catch (const qpid::framing::NotAllowedException& e) { @@ -822,7 +852,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (i->second != v) { + } else if (*i->second != *v) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 5a545c1f6a..cc6e9b9ab2 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -20,7 +20,6 @@ */ #include "ConnectionImpl.h" #include "SessionImpl.h" -#include "SimpleUrlParser.h" #include "qpid/messaging/exceptions.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/PrivateImplRef.h" @@ -39,26 +38,18 @@ using qpid::types::Variant; using qpid::types::VAR_LIST; using qpid::framing::Uuid; -void convert(const Variant::List& from, std::vector<std::string>& to) -{ - for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { - to.push_back(i->asString()); - } +namespace { +void merge(const std::string& value, std::vector<std::string>& list) { + if (std::find(list.begin(), list.end(), value) == list.end()) + list.push_back(value); } -template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value) +void merge(const Variant::List& from, std::vector<std::string>& to) { - Variant::Map::const_iterator i = map.find(key); - if (i != map.end()) { - value = (T) i->second; - QPID_LOG(debug, "option " << key << " specified as " << i->second); - return true; - } else { - return false; - } + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) + merge(i->asString(), to); } -namespace { std::string asString(const std::vector<std::string>& v) { std::stringstream os; os << "["; @@ -71,49 +62,8 @@ std::string asString(const std::vector<std::string>& v) { } } -template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map, - const std::string& key, - std::vector<std::string>& value) -{ - Variant::Map::const_iterator i = map.find(key); - if (i != map.end()) { - value.clear(); - if (i->second.getType() == VAR_LIST) { - convert(i->second.asList(), value); - } else { - value.push_back(i->second.asString()); - } - QPID_LOG(debug, "option " << key << " specified as " << asString(value)); - return true; - } else { - return false; - } -} - -void convert(const Variant::Map& from, ConnectionSettings& to) -{ - setIfFound(from, "username", to.username); - setIfFound(from, "password", to.password); - setIfFound(from, "sasl-mechanism", to.mechanism); - setIfFound(from, "sasl-service", to.service); - setIfFound(from, "sasl-min-ssf", to.minSsf); - setIfFound(from, "sasl-max-ssf", to.maxSsf); - - setIfFound(from, "heartbeat", to.heartbeat); - setIfFound(from, "tcp-nodelay", to.tcpNoDelay); - - setIfFound(from, "locale", to.locale); - setIfFound(from, "max-channels", to.maxChannels); - setIfFound(from, "max-frame-size", to.maxFrameSize); - setIfFound(from, "bounds", to.bounds); - - setIfFound(from, "transport", to.protocol); - - setIfFound(from, "ssl-cert-name", to.sslCertName); -} - ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - reconnect(false), timeout(-1), limit(-1), + replaceUrls(false), reconnect(false), timeout(-1), limit(-1), minReconnectInterval(3), maxReconnectInterval(60), retries(0), reconnectOnLimitExceeded(true) { @@ -124,27 +74,69 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio void ConnectionImpl::setOptions(const Variant::Map& options) { - sys::Mutex::ScopedLock l(lock); - convert(options, settings); - setIfFound(options, "reconnect", reconnect); - setIfFound(options, "reconnect-timeout", timeout); - setIfFound(options, "reconnect-limit", limit); - int64_t reconnectInterval; - if (setIfFound(options, "reconnect-interval", reconnectInterval)) { - minReconnectInterval = maxReconnectInterval = reconnectInterval; - } else { - setIfFound(options, "reconnect-interval-min", minReconnectInterval); - setIfFound(options, "reconnect-interval-max", maxReconnectInterval); + for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { + setOption(i->first, i->second); } - setIfFound(options, "reconnect-urls", urls); - setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded); } void ConnectionImpl::setOption(const std::string& name, const Variant& value) { - Variant::Map options; - options[name] = value; - setOptions(options); + sys::Mutex::ScopedLock l(lock); + if (name == "reconnect") { + reconnect = value; + } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { + timeout = value; + } else if (name == "reconnect-limit" || name == "reconnect_limit") { + limit = value; + } else if (name == "reconnect-interval" || name == "reconnect_interval") { + maxReconnectInterval = minReconnectInterval = value; + } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { + minReconnectInterval = value; + } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { + maxReconnectInterval = value; + } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { + replaceUrls = value.asBool(); + } else if (name == "reconnect-urls" || name == "reconnect_urls") { + if (replaceUrls) urls.clear(); + if (value.getType() == VAR_LIST) { + merge(value.asList(), urls); + } else { + merge(value.asString(), urls); + } + } else if (name == "username") { + settings.username = value.asString(); + } else if (name == "password") { + settings.password = value.asString(); + } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || + name == "sasl-mechanisms" || name == "sasl_mechanisms") { + settings.mechanism = value.asString(); + } else if (name == "sasl-service" || name == "sasl_service") { + settings.service = value.asString(); + } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { + settings.minSsf = value; + } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { + settings.maxSsf = value; + } else if (name == "heartbeat") { + settings.heartbeat = value; + } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { + settings.tcpNoDelay = value; + } else if (name == "locale") { + settings.locale = value.asString(); + } else if (name == "max-channels" || name == "max_channels") { + settings.maxChannels = value; + } else if (name == "max-frame-size" || name == "max_frame_size") { + settings.maxFrameSize = value; + } else if (name == "bounds") { + settings.bounds = value; + } else if (name == "transport") { + settings.protocol = value.asString(); + } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { + settings.sslCertName = value.asString(); + } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { + reconnectOnLimitExceeded = value; + } else { + throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); + } } @@ -214,7 +206,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st sessions[name] = impl; break; } catch (const qpid::TransportFailure&) { - open(); + reopen(); } catch (const qpid::SessionException& e) { throw qpid::messaging::SessionError(e.what()); } catch (const std::exception& e) { @@ -235,6 +227,15 @@ void ConnectionImpl::open() catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } } +void ConnectionImpl::reopen() +{ + if (!reconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + open(); +} + + bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; @@ -262,14 +263,9 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) } void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { - if (more.size()) { - for (size_t i = 0; i < more.size(); ++i) { - if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) { - urls.push_back(more[i].str()); - } - } - QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); - } + for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i) + merge(i->str(), urls); + QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); } bool ConnectionImpl::tryConnect() @@ -278,21 +274,14 @@ bool ConnectionImpl::tryConnect() for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { try { QPID_LOG(info, "Trying to connect to " << *i << "..."); - //TODO: when url support is more complete can avoid this test here - if (i->find("amqp:") == 0) { - Url url(*i); - connection.open(url, settings); - } else { - SimpleUrlParser::parse(*i, settings); - connection.open(settings); - } + Url url(*i); + if (url.getUser().size()) settings.username = url.getUser(); + if (url.getPass().size()) settings.password = url.getPass(); + connection.open(url, settings); QPID_LOG(info, "Connected to " << *i); mergeUrls(connection.getInitialBrokers(), l); return resetSessions(l); - } catch (const qpid::ConnectionException& e) { - //TODO: need to fix timeout on - //qpid::client::Connection::open() so that it throws - //TransportFailure rather than a ConnectionException + } catch (const qpid::TransportFailure& e) { QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); } } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 09f2038312..1b58cbbe3e 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -43,6 +43,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl public: ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); void open(); + void reopen(); bool isOpen() const; void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); @@ -59,6 +60,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::sys::Semaphore semaphore;//used to coordinate reconnection Sessions sessions; qpid::client::Connection connection; + bool replaceUrls; // Replace rather than merging with reconnect-urls std::vector<std::string> urls; qpid::client::ConnectionSettings settings; bool reconnect; diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 71e89bdba1..3badaf40ba 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -144,10 +144,10 @@ void IncomingMessages::accept() acceptTracker.accept(session); } -void IncomingMessages::accept(qpid::framing::SequenceNumber id) +void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) { sys::Mutex::ScopedLock l(lock); - acceptTracker.accept(id, session); + acceptTracker.accept(id, session, cumulative); } @@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject"); const std::string X_APP_ID("x-amqp-0-10.app-id"); const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); } void populateHeaders(qpid::messaging::Message& message, @@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message, if (messageProperties->hasContentEncoding()) { message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); } - // routing-key, others? + // routing-key, timestamp, others? if (deliveryProperties && deliveryProperties->hasRoutingKey()) { message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); } + if (deliveryProperties && deliveryProperties->hasTimestamp()) { + message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); + } } } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index f6a291bc68..9053b70312 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -72,7 +72,7 @@ class IncomingMessages bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); - void accept(qpid::framing::SequenceNumber id); + void accept(qpid::framing::SequenceNumber id, bool cumulative); void releaseAll(); void releasePending(const std::string& destination); diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index 82358961c8..d93416da75 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -59,7 +59,9 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); - message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); + if (from.getTtl().getMilliseconds()) { + message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); + } if (from.getDurable()) { message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index e1b75ec0cf..f2f0f1a9e5 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -135,6 +135,7 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + i->message.setRedelivered(true); sink->send(session, name, *i); } } @@ -147,7 +148,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) { uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) { if (flush) { - session.flush(); + session.flush(); flushed = true; } else { flushed = false; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 75a71997fd..be5eab1f2b 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -60,12 +60,14 @@ SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactio void SessionImpl::checkError() { + ScopedLock l(lock); qpid::client::SessionBase_0_10Access s(session); s.get()->assertOpen(); } bool SessionImpl::hasError() { + ScopedLock l(lock); qpid::client::SessionBase_0_10Access s(session); return s.get()->hasError(); } @@ -112,13 +114,14 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } -void SessionImpl::acknowledge(qpid::messaging::Message& m) +void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message - execute1<Acknowledge1>(m); + Acknowledge2 ack(*this, m, cumulative); + execute(ack); } void SessionImpl::close() @@ -128,27 +131,29 @@ void SessionImpl::close() senders.clear(); receivers.clear(); } else { - while (true) { - Sender s; - { - ScopedLock l(lock); - if (senders.empty()) break; - s = senders.begin()->second; - } - s.close(); // outside the lock, will call senderCancelled + Senders sCopy; + Receivers rCopy; + { + ScopedLock l(lock); + senders.swap(sCopy); + receivers.swap(rCopy); } - while (true) { - Receiver r; - { - ScopedLock l(lock); - if (receivers.empty()) break; - r = receivers.begin()->second; - } - r.close(); // outside the lock, will call receiverCancelled + for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i) + { + // outside the lock, will call senderCancelled + i->second.close(); + } + for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i) + { + // outside the lock, will call receiverCancelled + i->second.close(); } } connection->closed(*this); - if (!hasError()) session.close(); + if (!hasError()) { + ScopedLock l(lock); + session.close(); + } } template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) @@ -431,8 +436,11 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { - if (block) session.sync(); - else session.flush(); + { + ScopedLock l(lock); + if (block) session.sync(); + else session.flush(); + } //cleanup unconfirmed accept records: incoming.pendingAccept(); } @@ -467,10 +475,10 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } -void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) { ScopedLock l(lock); - if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) @@ -509,7 +517,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->open(); + connection->reopen(); } bool SessionImpl::backoff() diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 2a2aa47df6..c7dea77d18 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void acknowledge(bool sync); void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); - void acknowledge(qpid::messaging::Message& msg); + void acknowledge(qpid::messaging::Message& msg, bool cumulative); void close(); void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); @@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void commitImpl(); void rollbackImpl(); void acknowledgeImpl(); - void acknowledgeImpl(qpid::messaging::Message&); + void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); @@ -204,12 +204,13 @@ class SessionImpl : public qpid::messaging::SessionImpl void operator()() { impl.releaseImpl(message); } }; - struct Acknowledge1 : Command + struct Acknowledge2 : Command { qpid::messaging::Message& message; + bool cumulative; - Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} - void operator()() { impl.acknowledgeImpl(message); } + Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} + void operator()() { impl.acknowledgeImpl(message, cumulative); } }; struct CreateSender; diff --git a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp deleted file mode 100644 index 327c2274a6..0000000000 --- a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "SimpleUrlParser.h" -#include "qpid/client/ConnectionSettings.h" -#include "qpid/Exception.h" -#include <boost/lexical_cast.hpp> - -namespace qpid { -namespace client { -namespace amqp0_10 { - -bool split(const std::string& in, char delim, std::pair<std::string, std::string>& result) -{ - std::string::size_type i = in.find(delim); - if (i != std::string::npos) { - result.first = in.substr(0, i); - if (i+1 < in.size()) { - result.second = in.substr(i+1); - } - return true; - } else { - return false; - } -} - -void parseUsernameAndPassword(const std::string& in, qpid::client::ConnectionSettings& result) -{ - std::pair<std::string, std::string> parts; - if (!split(in, '/', parts)) { - result.username = in; - } else { - result.username = parts.first; - result.password = parts.second; - } -} - -void parseHostAndPort(const std::string& in, qpid::client::ConnectionSettings& result) -{ - std::pair<std::string, std::string> parts; - if (!split(in, ':', parts)) { - result.host = in; - } else { - result.host = parts.first; - if (parts.second.size()) { - result.port = boost::lexical_cast<uint16_t>(parts.second); - } - } -} - -void SimpleUrlParser::parse(const std::string& url, qpid::client::ConnectionSettings& result) -{ - std::pair<std::string, std::string> parts; - if (!split(url, '@', parts)) { - parseHostAndPort(url, result); - } else { - parseUsernameAndPassword(parts.first, result); - parseHostAndPort(parts.second, result); - } -} - -}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h deleted file mode 100644 index 24f90ca9d6..0000000000 --- a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H -#define QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -namespace qpid { -namespace client { - -struct ConnectionSettings; - -namespace amqp0_10 { - -/** - * Parses a simple url of the form user/password@hostname:port - */ -struct SimpleUrlParser -{ - static void parse(const std::string& url, qpid::client::ConnectionSettings& result); -}; -}}} // namespace qpid::client::amqp0_10 - -#endif /*!QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H*/ diff --git a/cpp/src/qpid/client/windows/SaslFactory.cpp b/cpp/src/qpid/client/windows/SaslFactory.cpp index 63c7fa3d1f..53d825771b 100644 --- a/cpp/src/qpid/client/windows/SaslFactory.cpp +++ b/cpp/src/qpid/client/windows/SaslFactory.cpp @@ -71,7 +71,7 @@ class WindowsSasl : public Sasl public: WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int ); ~WindowsSasl(); - std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings); + bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings); std::string step(const std::string& challenge); std::string getMechanism(); std::string getUserId(); @@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl() { } -std::string WindowsSasl::start(const std::string& mechanisms, - const SecuritySettings* /*externalSettings*/) +bool WindowsSasl::start(const std::string& mechanisms, std::string& response, + const SecuritySettings* /*externalSettings*/) { QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")"); @@ -142,18 +142,18 @@ std::string WindowsSasl::start(const std::string& mechanisms, if (!haveAnon && !havePlain) throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism")); - std::string resp = ""; if (havePlain) { mechanism = PLAIN; - resp = ((char)0) + settings.username + ((char)0) + settings.password; + response = ((char)0) + settings.username + ((char)0) + settings.password; } else { mechanism = ANONYMOUS; + response = ""; } - return resp; + return true; } -std::string WindowsSasl::step(const std::string& challenge) +std::string WindowsSasl::step(const std::string& /*challenge*/) { // Shouldn't get this for PLAIN... throw InternalErrorException(QPID_MSG("Sasl step error")); @@ -169,7 +169,7 @@ std::string WindowsSasl::getUserId() return std::string(); // TODO - when GSSAPI is supported, return userId for connection. } -std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize) +std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t /*maxFrameSize*/) { return std::auto_ptr<SecurityLayer>(0); } diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp index a33713e1a8..785c817928 100644 --- a/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/cpp/src/qpid/client/windows/SslConnector.cpp @@ -77,7 +77,7 @@ public: framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); - virtual void connect(const std::string& host, int port); + virtual void connect(const std::string& host, const std::string& port); virtual void connected(const Socket&); unsigned int getSSF(); }; @@ -153,7 +153,7 @@ SslConnector::~SslConnector() // Will this get reach via virtual method via boost::bind???? -void SslConnector::connect(const std::string& host, int port) { +void SslConnector::connect(const std::string& host, const std::string& port) { brokerHost = host; TCPConnector::connect(host, port); } |