summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-14 16:21:34 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-14 16:21:34 +0000
commitd84a3a50dbb794c4383de7e5eca730ca602771e7 (patch)
tree7c6177573a2eedc172de2cbd8354ce7b4ea1e8fe /qpid/cpp/src/qpid/client/amqp0_10
parent0aba202a7e2483f04fc77bbe4faa88bb86fe5b9b (diff)
parent47551f3aa2dd46b8daeeb9683a668464203ffa06 (diff)
downloadqpid-python-d84a3a50dbb794c4383de7e5eca730ca602771e7.tar.gz
Create sandbox from correct revision
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp42
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h5
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp23
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp164
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp18
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h11
11 files changed, 121 insertions, 157 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
index d2accddcd0..bfb20118b5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -30,23 +30,12 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
{
- SequenceSet accepting;
- if (cumulative) {
- for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
- accepting.add(*i);
- }
- unconfirmed.add(accepting);
- unaccepted.remove(accepting);
- } else {
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
- accepting.add(id);
- }
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
}
- return accepting;
}
void AcceptTracker::State::release()
@@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin
destinationState[destination].unaccepted.add(id);
}
-namespace
-{
-const size_t FLUSH_FREQUENCY = 1024;
-}
-
-void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record)
-{
- pending.push_back(record);
- if (pending.size() % FLUSH_FREQUENCY == 0) session.flush();
-}
-
-
void AcceptTracker::accept(qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
@@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session)
Record record;
record.status = session.messageAccept(aggregateState.unaccepted);
record.accepted = aggregateState.unaccepted;
- addToPending(session, record);
+ pending.push_back(record);
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id, cumulative);
+ i->second.accept(id);
}
Record record;
- record.accepted = aggregateState.accept(id, cumulative);
+ record.accepted.add(id);
record.status = session.messageAccept(record.accepted);
- addToPending(session, record);
+ pending.push_back(record);
+ aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
index 85209c3b87..87890e41cc 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -42,7 +42,7 @@ class AcceptTracker
public:
void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
void accept(qpid::client::AsyncSession&);
- void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
void release(qpid::client::AsyncSession&);
uint32_t acceptsPending();
uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
qpid::framing::SequenceSet unconfirmed;
void accept();
- qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
+ void accept(qpid::framing::SequenceNumber);
void release();
uint32_t acceptsPending();
void completed(qpid::framing::SequenceSet&);
@@ -79,7 +79,6 @@ class AcceptTracker
StateMap destinationState;
Records pending;
- void addToPending(qpid::client::AsyncSession&, const Record&);
void checkPending();
void completed(qpid::framing::SequenceSet&);
};
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index cea7eb0b51..f1295a3b66 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -233,8 +233,6 @@ class Subscription : public Exchange, public MessageSource
const bool reliable;
const bool durable;
const std::string actualType;
- const bool exclusiveQueue;
- const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -309,7 +307,6 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
- bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -341,12 +338,6 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
-bool Opt::asBool(bool defaultValue) const
-{
- if (value) return value->asBool();
- else return defaultValue;
-}
-
std::string Opt::str() const
{
if (value) return value->asString();
@@ -499,9 +490,7 @@ Subscription::Subscription(const Address& address, const std::string& type)
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
- exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
- exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -561,7 +550,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -570,15 +559,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
+ session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
@@ -833,7 +822,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (*i->second != *v) {
+ } else if (i->second != v) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 3492938156..5a545c1f6a 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -39,18 +39,26 @@ using qpid::types::Variant;
using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
-namespace {
-void merge(const std::string& value, std::vector<std::string>& list) {
- if (std::find(list.begin(), list.end(), value) == list.end())
- list.push_back(value);
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
}
-void merge(const Variant::List& from, std::vector<std::string>& to)
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
- for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
- merge(i->asString(), to);
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ return false;
+ }
}
+namespace {
std::string asString(const std::vector<std::string>& v) {
std::stringstream os;
os << "[";
@@ -63,8 +71,49 @@ std::string asString(const std::vector<std::string>& v) {
}
}
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value.clear();
+ if (i->second.getType() == VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ QPID_LOG(debug, "option " << key << " specified as " << asString(value));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+ setIfFound(from, "username", to.username);
+ setIfFound(from, "password", to.password);
+ setIfFound(from, "sasl-mechanism", to.mechanism);
+ setIfFound(from, "sasl-service", to.service);
+ setIfFound(from, "sasl-min-ssf", to.minSsf);
+ setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+ setIfFound(from, "heartbeat", to.heartbeat);
+ setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+ setIfFound(from, "locale", to.locale);
+ setIfFound(from, "max-channels", to.maxChannels);
+ setIfFound(from, "max-frame-size", to.maxFrameSize);
+ setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "transport", to.protocol);
+
+ setIfFound(from, "ssl-cert-name", to.sslCertName);
+}
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
+ reconnect(false), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
@@ -75,67 +124,27 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio
void ConnectionImpl::setOptions(const Variant::Map& options)
{
- for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
- setOption(i->first, i->second);
+ sys::Mutex::ScopedLock l(lock);
+ convert(options, settings);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "reconnect-interval-min", minReconnectInterval);
+ setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
}
+ setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
- sys::Mutex::ScopedLock l(lock);
- if (name == "reconnect") {
- reconnect = value;
- } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
- } else if (name == "reconnect-limit" || name == "reconnect_limit") {
- limit = value;
- } else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
- } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
- } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
- } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
- replaceUrls = value.asBool();
- } else if (name == "reconnect-urls" || name == "reconnect_urls") {
- if (replaceUrls) urls.clear();
- if (value.getType() == VAR_LIST) {
- merge(value.asList(), urls);
- } else {
- merge(value.asString(), urls);
- }
- } else if (name == "username") {
- settings.username = value.asString();
- } else if (name == "password") {
- settings.password = value.asString();
- } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
- name == "sasl-mechanisms" || name == "sasl_mechanisms") {
- settings.mechanism = value.asString();
- } else if (name == "sasl-service" || name == "sasl_service") {
- settings.service = value.asString();
- } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
- settings.minSsf = value;
- } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
- settings.maxSsf = value;
- } else if (name == "heartbeat") {
- settings.heartbeat = value;
- } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
- settings.tcpNoDelay = value;
- } else if (name == "locale") {
- settings.locale = value.asString();
- } else if (name == "max-channels" || name == "max_channels") {
- settings.maxChannels = value;
- } else if (name == "max-frame-size" || name == "max_frame_size") {
- settings.maxFrameSize = value;
- } else if (name == "bounds") {
- settings.bounds = value;
- } else if (name == "transport") {
- settings.protocol = value.asString();
- } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
- settings.sslCertName = value.asString();
- } else {
- throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
- }
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
}
@@ -205,7 +214,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
sessions[name] = impl;
break;
} catch (const qpid::TransportFailure&) {
- reopen();
+ open();
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
@@ -226,15 +235,6 @@ void ConnectionImpl::open()
catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
-void ConnectionImpl::reopen()
-{
- if (!reconnect) {
- throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
- }
- open();
-}
-
-
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
@@ -262,9 +262,14 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
}
void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
- for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
- merge(i->str(), urls);
- QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ if (more.size()) {
+ for (size_t i = 0; i < more.size(); ++i) {
+ if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+ urls.push_back(more[i].str());
+ }
+ }
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ }
}
bool ConnectionImpl::tryConnect()
@@ -284,7 +289,10 @@ bool ConnectionImpl::tryConnect()
QPID_LOG(info, "Connected to " << *i);
mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
- } catch (const qpid::TransportFailure& e) {
+ } catch (const qpid::ConnectionException& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 1b58cbbe3e..09f2038312 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -43,7 +43,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
public:
ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
void open();
- void reopen();
bool isOpen() const;
void close();
qpid::messaging::Session newSession(bool transactional, const std::string& name);
@@ -60,7 +59,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
- bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 5cf20c92eb..71e89bdba1 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
acceptTracker.accept(session);
}
-void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id)
{
sys::Mutex::ScopedLock l(lock);
- acceptTracker.accept(id, session, cumulative);
+ acceptTracker.accept(id, session);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 9053b70312..f6a291bc68 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -72,7 +72,7 @@ class IncomingMessages
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
- void accept(qpid::framing::SequenceNumber id, bool cumulative);
+ void accept(qpid::framing::SequenceNumber id);
void releaseAll();
void releasePending(const std::string& destination);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index d93416da75..82358961c8 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -59,9 +59,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
- if (from.getTtl().getMilliseconds()) {
- message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
- }
+ message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
if (from.getDurable()) {
message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index f2f0f1a9e5..e1b75ec0cf 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -135,7 +135,6 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
- i->message.setRedelivered(true);
sink->send(session, name, *i);
}
}
@@ -148,7 +147,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) {
uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
{
if (flush) {
- session.flush();
+ session.flush();
flushed = true;
} else {
flushed = false;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index e6255dcd6f..75a71997fd 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -112,14 +112,13 @@ void SessionImpl::release(qpid::messaging::Message& m)
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledge(qpid::messaging::Message& m)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- Acknowledge2 ack(*this, m, cumulative);
- execute(ack);
+ execute1<Acknowledge1>(m);
}
void SessionImpl::close()
@@ -432,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
void SessionImpl::syncImpl(bool block)
{
- {
- ScopedLock l(lock);
- if (block) session.sync();
- else session.flush();
- }
+ if (block) session.sync();
+ else session.flush();
//cleanup unconfirmed accept records:
incoming.pendingAccept();
}
@@ -471,10 +467,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -513,7 +509,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->reopen();
+ connection->open();
}
bool SessionImpl::backoff()
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index c7dea77d18..2a2aa47df6 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void acknowledge(bool sync);
void reject(qpid::messaging::Message&);
void release(qpid::messaging::Message&);
- void acknowledge(qpid::messaging::Message& msg, bool cumulative);
+ void acknowledge(qpid::messaging::Message& msg);
void close();
void sync(bool block);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void commitImpl();
void rollbackImpl();
void acknowledgeImpl();
- void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
+ void acknowledgeImpl(qpid::messaging::Message&);
void rejectImpl(qpid::messaging::Message&);
void releaseImpl(qpid::messaging::Message&);
void closeImpl();
@@ -204,13 +204,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
void operator()() { impl.releaseImpl(message); }
};
- struct Acknowledge2 : Command
+ struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
- bool cumulative;
- Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
- void operator()() { impl.acknowledgeImpl(message, cumulative); }
+ Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()() { impl.acknowledgeImpl(message); }
};
struct CreateSender;