diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10')
11 files changed, 121 insertions, 157 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index d2accddcd0..bfb20118b5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,23 +30,12 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) +void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) { - SequenceSet accepting; - if (cumulative) { - for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { - accepting.add(*i); - } - unconfirmed.add(accepting); - unaccepted.remove(accepting); - } else { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); - accepting.add(id); - } + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); } - return accepting; } void AcceptTracker::State::release() @@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin destinationState[destination].unaccepted.add(id); } -namespace -{ -const size_t FLUSH_FREQUENCY = 1024; -} - -void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) -{ - pending.push_back(record); - if (pending.size() % FLUSH_FREQUENCY == 0) session.flush(); -} - - void AcceptTracker::accept(qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { @@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) Record record; record.status = session.messageAccept(aggregateState.unaccepted); record.accepted = aggregateState.unaccepted; - addToPending(session, record); + pending.push_back(record); aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id, cumulative); + i->second.accept(id); } Record record; - record.accepted = aggregateState.accept(id, cumulative); + record.accepted.add(id); record.status = session.messageAccept(record.accepted); - addToPending(session, record); + pending.push_back(record); + aggregateState.accept(id); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 85209c3b87..87890e41cc 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -42,7 +42,7 @@ class AcceptTracker public: void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); void accept(qpid::client::AsyncSession&); - void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&); void release(qpid::client::AsyncSession&); uint32_t acceptsPending(); uint32_t acceptsPending(const std::string& destination); @@ -62,7 +62,7 @@ class AcceptTracker qpid::framing::SequenceSet unconfirmed; void accept(); - qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative); + void accept(qpid::framing::SequenceNumber); void release(); uint32_t acceptsPending(); void completed(qpid::framing::SequenceSet&); @@ -79,7 +79,6 @@ class AcceptTracker StateMap destinationState; Records pending; - void addToPending(qpid::client::AsyncSession&, const Record&); void checkPending(); void completed(qpid::framing::SequenceSet&); }; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index cea7eb0b51..f1295a3b66 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -233,8 +233,6 @@ class Subscription : public Exchange, public MessageSource const bool reliable; const bool durable; const std::string actualType; - const bool exclusiveQueue; - const bool exclusiveSubscription; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; @@ -309,7 +307,6 @@ struct Opt Opt& operator/(const std::string& name); operator bool() const; std::string str() const; - bool asBool(bool defaultValue) const; const Variant::List& asList() const; void collect(qpid::framing::FieldTable& args) const; @@ -341,12 +338,6 @@ Opt::operator bool() const return value && !value->isVoid() && value->asBool(); } -bool Opt::asBool(bool defaultValue) const -{ - if (value) return value->asBool(); - else return defaultValue; -} - std::string Opt::str() const { if (value) return value->asString(); @@ -499,9 +490,7 @@ Subscription::Subscription(const Address& address, const std::string& type) queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), reliable(AddressResolution::is_reliable(address)), durable(Opt(address)/LINK/DURABLE), - actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), - exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)) + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -561,7 +550,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str checkAssert(session, FOR_RECEIVER); //create subscription queue: - session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, + session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); //'default' binding: bindings.bind(session); @@ -570,15 +559,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str linkBindings.bind(session); //subscribe to subscription queue: AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; - session.messageSubscribe(arg::queue=queue, arg::destination=destination, - arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions); + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) { linkBindings.unbind(session); session.messageCancel(destination); - if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true); + session.queueDelete(arg::queue=queue); checkDelete(session, FOR_RECEIVER); } @@ -833,7 +822,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (i->second != v) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 3492938156..5a545c1f6a 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -39,18 +39,26 @@ using qpid::types::Variant; using qpid::types::VAR_LIST; using qpid::framing::Uuid; -namespace { -void merge(const std::string& value, std::vector<std::string>& list) { - if (std::find(list.begin(), list.end(), value) == list.end()) - list.push_back(value); +void convert(const Variant::List& from, std::vector<std::string>& to) +{ + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { + to.push_back(i->asString()); + } } -void merge(const Variant::List& from, std::vector<std::string>& to) +template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value) { - for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) - merge(i->asString(), to); + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + value = (T) i->second; + QPID_LOG(debug, "option " << key << " specified as " << i->second); + return true; + } else { + return false; + } } +namespace { std::string asString(const std::vector<std::string>& v) { std::stringstream os; os << "["; @@ -63,8 +71,49 @@ std::string asString(const std::vector<std::string>& v) { } } +template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map, + const std::string& key, + std::vector<std::string>& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + value.clear(); + if (i->second.getType() == VAR_LIST) { + convert(i->second.asList(), value); + } else { + value.push_back(i->second.asString()); + } + QPID_LOG(debug, "option " << key << " specified as " << asString(value)); + return true; + } else { + return false; + } +} + +void convert(const Variant::Map& from, ConnectionSettings& to) +{ + setIfFound(from, "username", to.username); + setIfFound(from, "password", to.password); + setIfFound(from, "sasl-mechanism", to.mechanism); + setIfFound(from, "sasl-service", to.service); + setIfFound(from, "sasl-min-ssf", to.minSsf); + setIfFound(from, "sasl-max-ssf", to.maxSsf); + + setIfFound(from, "heartbeat", to.heartbeat); + setIfFound(from, "tcp-nodelay", to.tcpNoDelay); + + setIfFound(from, "locale", to.locale); + setIfFound(from, "max-channels", to.maxChannels); + setIfFound(from, "max-frame-size", to.maxFrameSize); + setIfFound(from, "bounds", to.bounds); + + setIfFound(from, "transport", to.protocol); + + setIfFound(from, "ssl-cert-name", to.sslCertName); +} + ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(-1), limit(-1), + reconnect(false), timeout(-1), limit(-1), minReconnectInterval(3), maxReconnectInterval(60), retries(0), reconnectOnLimitExceeded(true) { @@ -75,67 +124,27 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio void ConnectionImpl::setOptions(const Variant::Map& options) { - for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { - setOption(i->first, i->second); + sys::Mutex::ScopedLock l(lock); + convert(options, settings); + setIfFound(options, "reconnect", reconnect); + setIfFound(options, "reconnect-timeout", timeout); + setIfFound(options, "reconnect-limit", limit); + int64_t reconnectInterval; + if (setIfFound(options, "reconnect-interval", reconnectInterval)) { + minReconnectInterval = maxReconnectInterval = reconnectInterval; + } else { + setIfFound(options, "reconnect-interval-min", minReconnectInterval); + setIfFound(options, "reconnect-interval-max", maxReconnectInterval); } + setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded); } void ConnectionImpl::setOption(const std::string& name, const Variant& value) { - sys::Mutex::ScopedLock l(lock); - if (name == "reconnect") { - reconnect = value; - } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { - timeout = value; - } else if (name == "reconnect-limit" || name == "reconnect_limit") { - limit = value; - } else if (name == "reconnect-interval" || name == "reconnect_interval") { - maxReconnectInterval = minReconnectInterval = value; - } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { - minReconnectInterval = value; - } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { - maxReconnectInterval = value; - } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { - replaceUrls = value.asBool(); - } else if (name == "reconnect-urls" || name == "reconnect_urls") { - if (replaceUrls) urls.clear(); - if (value.getType() == VAR_LIST) { - merge(value.asList(), urls); - } else { - merge(value.asString(), urls); - } - } else if (name == "username") { - settings.username = value.asString(); - } else if (name == "password") { - settings.password = value.asString(); - } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || - name == "sasl-mechanisms" || name == "sasl_mechanisms") { - settings.mechanism = value.asString(); - } else if (name == "sasl-service" || name == "sasl_service") { - settings.service = value.asString(); - } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { - settings.minSsf = value; - } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { - settings.maxSsf = value; - } else if (name == "heartbeat") { - settings.heartbeat = value; - } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { - settings.tcpNoDelay = value; - } else if (name == "locale") { - settings.locale = value.asString(); - } else if (name == "max-channels" || name == "max_channels") { - settings.maxChannels = value; - } else if (name == "max-frame-size" || name == "max_frame_size") { - settings.maxFrameSize = value; - } else if (name == "bounds") { - settings.bounds = value; - } else if (name == "transport") { - settings.protocol = value.asString(); - } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { - settings.sslCertName = value.asString(); - } else { - throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); - } + Variant::Map options; + options[name] = value; + setOptions(options); } @@ -205,7 +214,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st sessions[name] = impl; break; } catch (const qpid::TransportFailure&) { - reopen(); + open(); } catch (const qpid::SessionException& e) { throw qpid::messaging::SessionError(e.what()); } catch (const std::exception& e) { @@ -226,15 +235,6 @@ void ConnectionImpl::open() catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } } -void ConnectionImpl::reopen() -{ - if (!reconnect) { - throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); - } - open(); -} - - bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; @@ -262,9 +262,14 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) } void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { - for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i) - merge(i->str(), urls); - QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); + if (more.size()) { + for (size_t i = 0; i < more.size(); ++i) { + if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) { + urls.push_back(more[i].str()); + } + } + QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); + } } bool ConnectionImpl::tryConnect() @@ -284,7 +289,10 @@ bool ConnectionImpl::tryConnect() QPID_LOG(info, "Connected to " << *i); mergeUrls(connection.getInitialBrokers(), l); return resetSessions(l); - } catch (const qpid::TransportFailure& e) { + } catch (const qpid::ConnectionException& e) { + //TODO: need to fix timeout on + //qpid::client::Connection::open() so that it throws + //TransportFailure rather than a ConnectionException QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); } } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 1b58cbbe3e..09f2038312 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -43,7 +43,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl public: ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); void open(); - void reopen(); bool isOpen() const; void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); @@ -60,7 +59,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::sys::Semaphore semaphore;//used to coordinate reconnection Sessions sessions; qpid::client::Connection connection; - bool replaceUrls; // Replace rather than merging with reconnect-urls std::vector<std::string> urls; qpid::client::ConnectionSettings settings; bool reconnect; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 5cf20c92eb..71e89bdba1 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -144,10 +144,10 @@ void IncomingMessages::accept() acceptTracker.accept(session); } -void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) +void IncomingMessages::accept(qpid::framing::SequenceNumber id) { sys::Mutex::ScopedLock l(lock); - acceptTracker.accept(id, session, cumulative); + acceptTracker.accept(id, session); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 9053b70312..f6a291bc68 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -72,7 +72,7 @@ class IncomingMessages bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); - void accept(qpid::framing::SequenceNumber id, bool cumulative); + void accept(qpid::framing::SequenceNumber id); void releaseAll(); void releasePending(const std::string& destination); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index d93416da75..82358961c8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -59,9 +59,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); - if (from.getTtl().getMilliseconds()) { - message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); - } + message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); if (from.getDurable()) { message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index f2f0f1a9e5..e1b75ec0cf 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -135,7 +135,6 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->message.setRedelivered(true); sink->send(session, name, *i); } } @@ -148,7 +147,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) { uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) { if (flush) { - session.flush(); + session.flush(); flushed = true; } else { flushed = false; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index e6255dcd6f..75a71997fd 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -112,14 +112,13 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } -void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) +void SessionImpl::acknowledge(qpid::messaging::Message& m) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message - Acknowledge2 ack(*this, m, cumulative); - execute(ack); + execute1<Acknowledge1>(m); } void SessionImpl::close() @@ -432,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { - { - ScopedLock l(lock); - if (block) session.sync(); - else session.flush(); - } + if (block) session.sync(); + else session.flush(); //cleanup unconfirmed accept records: incoming.pendingAccept(); } @@ -471,10 +467,10 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } -void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) { ScopedLock l(lock); - if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) @@ -513,7 +509,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->reopen(); + connection->open(); } bool SessionImpl::backoff() diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index c7dea77d18..2a2aa47df6 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void acknowledge(bool sync); void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); - void acknowledge(qpid::messaging::Message& msg, bool cumulative); + void acknowledge(qpid::messaging::Message& msg); void close(); void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); @@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void commitImpl(); void rollbackImpl(); void acknowledgeImpl(); - void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); + void acknowledgeImpl(qpid::messaging::Message&); void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); @@ -204,13 +204,12 @@ class SessionImpl : public qpid::messaging::SessionImpl void operator()() { impl.releaseImpl(message); } }; - struct Acknowledge2 : Command + struct Acknowledge1 : Command { qpid::messaging::Message& message; - bool cumulative; - Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} - void operator()() { impl.acknowledgeImpl(message, cumulative); } + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.acknowledgeImpl(message); } }; struct CreateSender; |