summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-03-22 13:39:14 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-03-22 13:39:14 +0000
commit3c2954782cc6d27bacf8865cfaea9c71c2bfec2b (patch)
tree2f319b97e0c6d9e0052a8606f0e29d2b643834c2
parentb51489127365952db902579438294a587a7acf47 (diff)
downloadqpid-python-qpid-3890.tar.gz
QPID-3890: resync this branch to latest trunkqpid-3890
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1303774 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp13
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp84
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h16
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.cpp2
-rw-r--r--qpid/cpp/src/qpid/log/posix/SinkOptions.cpp8
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp10
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.cpp6
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp8
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp8
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp8
-rw-r--r--qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp5
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslIo.cpp7
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp25
-rw-r--r--qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp5
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Socket.cpp10
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp8
-rw-r--r--qpid/cpp/src/qpidd.cpp4
-rw-r--r--qpid/cpp/src/qpidd.h4
-rw-r--r--qpid/cpp/src/tests/RefCounted.cpp6
-rw-r--r--qpid/cpp/src/tests/StringUtils.cpp6
-rwxr-xr-xqpid/cpp/src/tests/cli_tests.py3
-rw-r--r--qpid/cpp/src/tests/qpid-ping.cpp27
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp39
-rw-r--r--qpid/cpp/src/tests/testagent.cpp11
-rw-r--r--qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp6
-rw-r--r--qpid/cpp/src/windows/QpiddBroker.cpp8
-rw-r--r--qpid/cpp/src/windows/SCM.cpp664
-rw-r--r--qpid/java/build.deps3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java2
-rw-r--r--qpid/java/lib/mockito-all-1.9.0.jarbin0 -> 1495219 bytes
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java17
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py64
-rwxr-xr-xqpid/tools/src/py/qpid-config2
37 files changed, 663 insertions, 465 deletions
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index 1cebcfc3ac..694751c27c 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -31,10 +31,11 @@
#include <unistd.h>
#include <sys/utsname.h>
-using namespace std;
-using namespace qpid;
-using qpid::broker::Broker;
-using qpid::broker::Daemon;
+using std::cout;
+using std::endl;
+
+namespace qpid {
+namespace broker {
BootstrapOptions::BootstrapOptions(const char* argv0)
: qpid::Options("Options"),
@@ -197,7 +198,9 @@ int QpiddBroker::execute (QpiddOptions *options) {
return 0;
}
+}} // namespace qpid::Broker
+
int main(int argc, char* argv[])
{
- return run_broker(argc, argv);
+ return qpid::broker::run_broker(argc, argv);
}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index f183ff8e0c..3f854c7510 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -31,9 +31,11 @@
#include <fstream>
#include <boost/lexical_cast.hpp>
+namespace qpid {
+namespace management {
+
using namespace qpid::client;
using namespace qpid::framing;
-using namespace qpid::management;
using namespace qpid::sys;
using namespace std;
using std::stringstream;
@@ -1260,7 +1262,7 @@ void ManagementAgentImpl::ConnectionThread::run()
int totalSleep = 0;
do {
sys::Mutex::ScopedUnlock _unlock(connLock);
- ::sleep(delayMin);
+ qpid::sys::sleep(delayMin);
totalSleep += delayMin;
} while (totalSleep < delay && !shutdown);
sleeping = false;
@@ -1396,8 +1398,10 @@ void ManagementAgentImpl::PublishThread::run()
sleepTime = 1;
while (totalSleep < agent.getInterval() && !shutdown) {
- ::sleep(sleepTime);
+ qpid::sys::sleep(sleepTime);
totalSleep += sleepTime;
}
}
}
+
+}} \ No newline at end of file
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index f311b79578..ad688ba314 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -32,7 +32,9 @@
#include "qpid/sys/ExceptionHolder.h"
#include <stdexcept>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
@@ -408,3 +410,5 @@ bool Exchange::routeWithAlternate(Deliverable& msg)
}
return msg.delivered;
}
+
+}} \ No newline at end of file
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7093a68d6c..40dfba39f4 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -131,12 +131,10 @@ uint32_t Message::getRequiredCredit()
void Message::encode(framing::Buffer& buffer) const
{
- {
- sys::Mutex::ScopedLock l(lock); // prevent header modifications while encoding
- //encode method and header frames
- EncodeFrame f1(buffer);
- frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
- }
+ sys::Mutex::ScopedLock l(lock);
+ //encode method and header frames
+ EncodeFrame f1(buffer);
+ frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
//then encode the payload of each content frame
framing::EncodeBody f2(buffer);
@@ -145,6 +143,7 @@ void Message::encode(framing::Buffer& buffer) const
void Message::encodeContent(framing::Buffer& buffer) const
{
+ sys::Mutex::ScopedLock l(lock);
//encode the payload of each content frame
EncodeBody f2(buffer);
frames.map_if(f2, TypeFilter<CONTENT_BODY>());
@@ -157,6 +156,7 @@ uint32_t Message::encodedSize() const
uint32_t Message::encodedContentSize() const
{
+ sys::Mutex::ScopedLock l(lock);
return frames.getContentSize();
}
@@ -222,8 +222,9 @@ void Message::releaseContent()
store->stage(pmsg);
staged = true;
}
- //ensure required credit is cached before content frames are released
+ //ensure required credit and size is cached before content frames are released
getRequiredCredit();
+ contentSize();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 5f450cd556..22253532cb 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,9 +43,18 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+/** return an iterator to the message at position, or members.end() if not found */
+MessageGroupManager::GroupState::MessageFifo::iterator
+MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position)
+{
+ MessageState mState(position);
+ MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState);
+ return (found->position == position) ? found : members.end();
+}
+
void MessageGroupManager::unFree( const GroupState& state )
{
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ GroupFifo::iterator pos = freeGroups.find( state.members.front().position );
assert( pos != freeGroups.end() && pos->second == &state );
freeGroups.erase( pos );
}
@@ -60,8 +69,8 @@ void MessageGroupManager::disown( GroupState& state )
{
state.owner.clear();
assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
+ assert(freeGroups.find(state.members.front().position) == freeGroups.end());
+ freeGroups[state.members.front().position] = &state;
}
MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
@@ -106,7 +115,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm )
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- state.members.push_back(qm.position);
+ GroupState::MessageState mState(qm.position);
+ state.members.push_back(mState);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << state.group << " total=" << total );
@@ -123,7 +133,9 @@ void MessageGroupManager::acquired( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert(state.members.size()); // there are msgs present
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ m->acquired = true;
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << state.group << " acquired=" << state.acquired );
@@ -137,6 +149,9 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
GroupState& state = findGroup(qm);
assert( state.acquired != 0 );
state.acquired -= 1;
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ m->acquired = false;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
@@ -152,13 +167,17 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert( state.members.size() != 0 );
- assert( state.acquired != 0 );
- state.acquired -= 1;
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ if (m->acquired) {
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ }
- // likely to be at or near begin() if dequeued in order
+ // special case if qm is first (oldest) message in the group:
+ // may need to re-insert it back on the freeGroups list, as the index will change
bool reFreeNeeded = false;
- if (state.members.front() == qm.position) {
+ if (m == state.members.begin()) {
if (!state.owned()) {
// will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
// if on freelist, it is indexed by first member, which is about to be removed!
@@ -167,15 +186,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
}
state.members.pop_front();
} else {
- GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
- GroupState::PositionFifo::iterator end = state.members.end();
- while (pos != end) {
- if (*pos == qm.position) {
- state.members.erase(pos);
- break;
- }
- ++pos;
- }
+ state.members.erase(m);
}
uint32_t total = state.members.size();
@@ -220,11 +231,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
GroupState& group = findGroup(next);
if (!group.owned()) {
//TODO: make acquire more efficient when we already have the message in question
- if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
+ if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
- << "'s head message still pending. pos=" << group.members.front());
+ << "'s head message still pending. pos=" << group.members.front().position);
} else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
@@ -284,7 +295,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const
info[GROUP_TIMESTAMP] = 0;
if (g->second.members.size() != 0) {
QueuedMessage qm;
- if (messages.find(g->second.members.front(), qm) &&
+ if (messages.find(g->second.members.front().position, qm) &&
qm.payload &&
qm.payload->hasProperties<framing::DeliveryProperties>()) {
info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
@@ -353,6 +364,7 @@ namespace {
const std::string GROUP_OWNER("owner");
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
+ const std::string GROUP_ACQUIRED_MSGS("acquired-msgs");
const std::string GROUP_STATE("group-state");
}
@@ -371,10 +383,14 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
group.setString(GROUP_OWNER, g->second.owner);
group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
framing::Array positions(TYPE_CODE_UINT32);
- for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
- p != g->second.members.end(); ++p)
- positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+ for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
+ p != g->second.members.end(); ++p) {
+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
+ acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
+ }
group.setArray(GROUP_POSITIONS, positions);
+ group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
}
state.setArray(GROUP_STATE, groupState);
@@ -425,13 +441,25 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
qName << "\": position encoding error!");
return;
}
+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+ ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
+ if (!ok || positions.count() != acquiredMsgs.count()) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": acquired flag encoding error!");
+ return;
+ }
+
+ Array::const_iterator a = acquiredMsgs.begin();
+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
+ GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
+ mState.acquired = (*a++)->getIntegerValue<bool>();
+ state.members.push_back(mState);
+ }
- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
- state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
if (!state.owned()) {
assert(state.members.size());
- freeGroups[state.members.front()] = &messageGroups[state.group];
+ freeGroups[state.members.front().position] = &messageGroups[state.group];
}
}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4760..340ebbc56a 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -45,19 +45,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
struct GroupState {
// note: update getState()/setState() when changing this object's state implementation
- typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+ // track which messages are in this group, and if they have been acquired
+ struct MessageState {
+ qpid::framing::SequenceNumber position;
+ bool acquired;
+ MessageState() : acquired(false) {}
+ MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {}
+ bool operator<(const MessageState& b) const { return position < b.position; }
+ };
+ typedef std::deque<MessageState> MessageFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- PositionFifo members; // msgs belonging to this group
+ MessageFifo members; // msgs belonging to this group, in enqueue order
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
+ MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &);
};
typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
- typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+ typedef std::map<qpid::framing::SequenceNumber, struct GroupState *> GroupFifo;
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3bd9233791..fdd95ae3bd 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -56,7 +56,9 @@
#include <boost/intrusive_ptr.hpp>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
@@ -1469,7 +1471,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask
Queue::shared_ptr queue;
AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
void fire()
{
@@ -1824,3 +1826,5 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+}} \ No newline at end of file
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
index b4f7d00f38..90e4fa9d4d 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
#include "qpid/framing/ClusterTimerDropBody.h"
+#include "qpid/sys/ClusterSafe.h"
namespace qpid {
namespace cluster {
@@ -107,6 +108,7 @@ void ClusterTimer::drop(intrusive_ptr<TimerTask> t) {
// Deliver thread
void ClusterTimer::deliverWakeup(const std::string& name) {
QPID_LOG(trace, "Cluster timer wakeup delivered for " << name);
+ qpid::sys::assertClusterSafe();
Map::iterator i = map.find(name);
if (i == map.end())
throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name));
diff --git a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
index ffa7633e3b..8459938e5c 100644
--- a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
+++ b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
@@ -30,6 +30,10 @@
using std::string;
using qpid::Exception;
+namespace qpid {
+namespace log {
+namespace posix {
+
namespace {
// SyslogFacilities maps from syslog values to the text equivalents.
@@ -110,10 +114,6 @@ std::string basename(const std::string path) {
} // namespace
-namespace qpid {
-namespace log {
-namespace posix {
-
std::ostream& operator<<(std::ostream& o, const SyslogFacility& f) {
return o << SyslogFacilities().name(f.value);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 8c2cb95faa..cb07d5d047 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -44,14 +44,15 @@
#include <sstream>
#include <typeinfo>
+namespace qpid {
+namespace management {
+
using boost::intrusive_ptr;
using qpid::framing::Uuid;
using qpid::types::Variant;
using qpid::amqp_0_10::MapCodec;
using qpid::amqp_0_10::ListCodec;
-using qpid::sys::Mutex;
using namespace qpid::framing;
-using namespace qpid::management;
using namespace qpid::broker;
using namespace qpid;
using namespace std;
@@ -2961,9 +2962,6 @@ bool ManagementAgent::moveDeletedObjectsLH() {
return !deleteList.empty();
}
-namespace qpid {
-namespace management {
-
namespace {
QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
}
@@ -2977,4 +2975,4 @@ const qpid::broker::ConnectionState* getManagementExecutionContext()
return executionContext;
}
-}}
+}} \ No newline at end of file
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
index 2a8d971987..20231bf910 100644
--- a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -28,6 +28,9 @@
#include "qpid/DataDir.h"
#include "qpid/log/Statement.h"
+namespace qpid {
+namespace store {
+
/*
* The MessageStore pointer given to the Broker points to static storage.
* Thus, it cannot be deleted, especially by the broker. To prevent deletion,
@@ -42,9 +45,6 @@ namespace {
};
}
-namespace qpid {
-namespace store {
-
static MessageStorePlugin static_instance_registers_plugin;
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
index 14d63a4cd4..849a0a44e8 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
@@ -32,6 +32,10 @@
#include "MessageLog.h"
#include "Lsn.h"
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
namespace {
// Structures that hold log records. Each has a type field at the start.
@@ -97,10 +101,6 @@ struct MessageDequeue {
} // namespace
-namespace qpid {
-namespace store {
-namespace ms_clfs {
-
void
MessageLog::initialize()
{
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
index 04780e83e8..0ef046d7c8 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
@@ -33,6 +33,10 @@
#include "Transaction.h"
#include "Lsn.h"
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
namespace {
// Structures that hold log records. Each has a type field at the start.
@@ -95,10 +99,6 @@ struct TransactionDelete {
} // namespace
-namespace qpid {
-namespace store {
-namespace ms_clfs {
-
void
TransactionLog::initialize()
{
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index c25159985e..01ff8b6bfa 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -41,7 +41,9 @@
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
-using namespace qpid::sys;
+namespace qpid {
+namespace sys {
+namespace posix {
namespace {
@@ -71,10 +73,6 @@ __thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
/*
* Asynch Acceptor
*/
-namespace qpid {
-namespace sys {
-namespace posix {
-
class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
public:
AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
index b22a615a54..abff8a5be8 100644
--- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
@@ -1,6 +1,3 @@
-#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP
-#define QPID_SYS_LINUX_POLLABLECONDITION_CPP
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -120,5 +117,3 @@ void PollableCondition::set() { impl->set(); }
void PollableCondition::clear() { impl->clear(); }
}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
index 73f15617dc..2a7cf16923 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -37,8 +37,9 @@
#include <boost/bind.hpp>
-using namespace qpid::sys;
-using namespace qpid::sys::ssl;
+namespace qpid {
+namespace sys {
+namespace ssl {
namespace {
@@ -448,3 +449,5 @@ SecuritySettings SslIO::getSecuritySettings() {
settings.authid = socket.getClientAuthId();
return settings;
}
+
+}}}
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 30378d4c5f..fb8df5ddf8 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -295,6 +295,8 @@ private:
volatile bool queuedDelete;
// Socket close requested, but there are operations in progress.
volatile bool queuedClose;
+ // Most recent asynch read request
+ volatile AsynchReadResult* pendingRead;
private:
// Dispatch events that have completed.
@@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s,
writeInProgress(false),
queuedDelete(false),
queuedClose(false),
+ pendingRead(0),
working(false) {
}
@@ -504,6 +507,7 @@ void AsynchIO::startReading() {
}
}
// On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ pendingRead = result;
}
else {
notifyBuffersEmpty();
@@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
if (status == 0 && bytes > 0) {
- bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
readCallback(*this, result->getBuff());
- if (restartRead)
- startReading();
+ startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,
// so "unread" it back to the front of the queue.
unread(result->getBuff());
+ if (queuedClose && status == ERROR_OPERATION_ABORTED) {
+ return; // Expected reap from CancelIoEx
+ }
notifyEof();
if (status != 0)
{
@@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) {
{
ScopedUnlock<Mutex> ul(completionLock);
AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
- if (r != 0)
+ if (r != 0) {
readComplete(r);
+ // Set pendingRead to 0 if it's still pointing to (newly completed) r
+ InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r);
+ }
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
@@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) {
else if (queuedDelete)
delete this;
}
+ else {
+ if (queuedClose && pendingRead) {
+ // Force outstanding read to completion. Layer above will
+ // call back.
+ CancelIoEx((HANDLE)toSocketHandle(socket),
+ ((AsynchReadResult *)pendingRead)->overlapped());
+ pendingRead = 0;
+ }
+ }
}
} // namespace windows
diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
index 6a1d9045b4..bb637be0a6 100644
--- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -1,6 +1,3 @@
-#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP
-#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -110,5 +107,3 @@ void PollableCondition::clear() {
}
}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 1fa4768329..b085f67539 100644
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -32,6 +32,9 @@
#include <winsock2.h>
+namespace qpid {
+namespace sys {
+
// Need to initialize WinSock. Ideally, this would be a singleton or embedded
// in some one-time initialization function. I tried boost singleton and could
// not get it to compile (and others located in google had the same problem).
@@ -76,13 +79,6 @@ protected:
static WinSockSetup setup;
-} /* namespace */
-
-namespace qpid {
-namespace sys {
-
-namespace {
-
std::string getName(SOCKET fd, bool local)
{
::sockaddr_storage name_s; // big enough for any socket address
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
index 11a3389e45..25cc94b290 100644
--- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -38,6 +38,10 @@
#include <queue>
#include <boost/bind.hpp>
+namespace qpid {
+namespace sys {
+namespace windows {
+
namespace {
/*
@@ -66,10 +70,6 @@ namespace {
};
}
-namespace qpid {
-namespace sys {
-namespace windows {
-
SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s,
CredHandle hCred,
ReadCallback rCb,
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index a0e329ca9d..b5686c6ab8 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -29,6 +29,9 @@
#include <memory>
using namespace std;
+namespace qpid {
+namespace broker {
+
auto_ptr<QpiddOptions> options;
// Broker real entry; various system-invoked entrypoints call here.
@@ -87,3 +90,4 @@ int run_broker(int argc, char *argv[], bool hidden)
}
return 1;
}
+}}
diff --git a/qpid/cpp/src/qpidd.h b/qpid/cpp/src/qpidd.h
index a3150a2737..f7f84d11da 100644
--- a/qpid/cpp/src/qpidd.h
+++ b/qpid/cpp/src/qpidd.h
@@ -26,6 +26,9 @@
#include <memory>
+namespace qpid {
+namespace broker {
+
// BootstrapOptions is a minimal subset of options used for a pre-parse
// of the command line to discover which plugin modules need to be loaded.
// The pre-parse is necessary because plugin modules may supply their own
@@ -70,4 +73,5 @@ public:
// Broker real entry; various system-invoked entrypoints call here.
int run_broker(int argc, char *argv[], bool hidden = false);
+}}
#endif /*!QPID_H*/
diff --git a/qpid/cpp/src/tests/RefCounted.cpp b/qpid/cpp/src/tests/RefCounted.cpp
index e4c1da5696..3ac3895322 100644
--- a/qpid/cpp/src/tests/RefCounted.cpp
+++ b/qpid/cpp/src/tests/RefCounted.cpp
@@ -21,15 +21,15 @@
#include "unit_test.h"
+namespace qpid {
+namespace tests {
+
QPID_AUTO_TEST_SUITE(RefCountedTestSuiteTestSuite)
using boost::intrusive_ptr;
using namespace std;
using namespace qpid;
-namespace qpid {
-namespace tests {
-
struct CountMe : public RefCounted {
static int instances;
CountMe() { ++instances; }
diff --git a/qpid/cpp/src/tests/StringUtils.cpp b/qpid/cpp/src/tests/StringUtils.cpp
index 6a19119288..c50287a4f4 100644
--- a/qpid/cpp/src/tests/StringUtils.cpp
+++ b/qpid/cpp/src/tests/StringUtils.cpp
@@ -23,9 +23,11 @@
#include "unit_test.h"
+namespace qpid {
+namespace tests {
+
QPID_AUTO_TEST_SUITE(StringUtilsTestSuite)
-using namespace qpid;
using std::string;
QPID_AUTO_TEST_CASE(testSplit_general)
@@ -75,3 +77,5 @@ QPID_AUTO_TEST_CASE(testSplit_empty)
}
QPID_AUTO_TEST_SUITE_END()
+
+}}
diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py
index b9a7dda15c..7ac5b1deed 100755
--- a/qpid/cpp/src/tests/cli_tests.py
+++ b/qpid/cpp/src/tests/cli_tests.py
@@ -330,6 +330,9 @@ class CliTests(TestBase010):
ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName)))
self.assertEqual(ret, 0)
+ ret = os.system(self.qpid_config_command(" queues"))
+ self.assertEqual(ret, 0)
+
queues = self.broker_access.getAllQueues()
found = False
for queue in queues:
diff --git a/qpid/cpp/src/tests/qpid-ping.cpp b/qpid/cpp/src/tests/qpid-ping.cpp
index 0cb4afa0ee..52331499e7 100644
--- a/qpid/cpp/src/tests/qpid-ping.cpp
+++ b/qpid/cpp/src/tests/qpid-ping.cpp
@@ -32,11 +32,20 @@
#include <string>
#include <iostream>
-using namespace std;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using namespace qpid::client;
-using namespace qpid;
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::exception;
+using std::string;
+using namespace qpid::client::arg; // For keyword args
+using qpid::client::AsyncSession;
+using qpid::client::Connection;
+using qpid::client::Message;
+using qpid::client::SubscriptionManager;
+using qpid::framing::Uuid;
+
+namespace qpid {
+namespace tests {
struct PingOptions : public qpid::TestOptions {
int timeout; // Timeout in seconds.
@@ -48,9 +57,11 @@ struct PingOptions : public qpid::TestOptions {
}
};
+}} // namespace qpid::tests
+
int main(int argc, char** argv) {
try {
- PingOptions opts;
+ qpid::tests::PingOptions opts;
opts.parse(argc, argv);
opts.con.heartbeat = (opts.timeout+1)/2;
Connection connection;
@@ -58,8 +69,8 @@ int main(int argc, char** argv) {
if (!opts.quiet) cout << "Opened connection." << endl;
AsyncSession s = connection.newSession();
string qname(Uuid(true).str());
- s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true);
- s.messageTransfer(arg::content=Message("hello", qname));
+ s.queueDeclare(queue=qname, autoDelete=true, exclusive=true);
+ s.messageTransfer(content=Message("hello", qname));
if (!opts.quiet) cout << "Sent message." << endl;
SubscriptionManager subs(s);
subs.get(qname);
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index b1213a484f..91eef0cd71 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -36,15 +36,26 @@
#include <iostream>
#include <memory>
-using namespace std;
-using namespace qpid::messaging;
-using namespace qpid::types;
-
-typedef std::vector<std::string> string_vector;
+using std::string;
+using std::ios_base;
+
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Duration;
+using qpid::messaging::FailoverUpdates;
+using qpid::messaging::Message;
+using qpid::messaging::Receiver;
+using qpid::messaging::Session;
+using qpid::messaging::Sender;
+using qpid::types::Exception;
+using qpid::types::Uuid;
+using qpid::types::Variant;
namespace qpid {
namespace tests {
+typedef std::vector<std::string> string_vector;
+
struct Options : public qpid::Options
{
bool help;
@@ -223,10 +234,6 @@ const string EOS("eos");
const string SN("sn");
const string TS("ts");
-}} // namespace qpid::tests
-
-using namespace qpid::tests;
-
class ContentGenerator {
public:
virtual ~ContentGenerator() {}
@@ -329,6 +336,20 @@ public:
}
};
+}} // namespace qpid::tests
+
+using qpid::tests::Options;
+using qpid::tests::Reporter;
+using qpid::tests::Throughput;
+using qpid::tests::ContentGenerator;
+using qpid::tests::GroupGenerator;
+using qpid::tests::GetlineContentGenerator;
+using qpid::tests::MapContentGenerator;
+using qpid::tests::FixedContentGenerator;
+using qpid::tests::SN;
+using qpid::tests::TS;
+using qpid::tests::EOS;
+
int main(int argc, char ** argv)
{
Connection connection;
diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp
index 98520b424a..e6010a8e00 100644
--- a/qpid/cpp/src/tests/testagent.cpp
+++ b/qpid/cpp/src/tests/testagent.cpp
@@ -36,9 +36,12 @@
#include <sstream>
+namespace qpid {
+namespace tests {
+
static bool running = true;
-using namespace std;
+using std::string;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -191,12 +194,14 @@ int main_int(int argc, char** argv)
return 0;
}
+}} // namespace qpid::tests
+
int main(int argc, char** argv)
{
try {
- return main_int(argc, argv);
+ return qpid::tests::main_int(argc, argv);
} catch(std::exception& e) {
- cerr << "Top Level Exception: " << e.what() << endl;
+ std::cerr << "Top Level Exception: " << e.what() << std::endl;
return 1;
}
}
diff --git a/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp b/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp
index 024f20b147..14f1e46606 100644
--- a/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp
+++ b/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp
@@ -32,7 +32,9 @@
#include <windows.h>
#include <iostream>
-namespace {
+namespace qpid {
+namespace tests {
+namespace windows {
// Instead of popping up a window for exceptions, just print something out
LONG _stdcall UnhandledExceptionFilter (PEXCEPTION_POINTERS pExceptionInfo)
@@ -73,4 +75,4 @@ redirect_errors_to_stderr::redirect_errors_to_stderr()
SetUnhandledExceptionFilter (&UnhandledExceptionFilter);
}
-} // namespace
+}}} // namespace
diff --git a/qpid/cpp/src/windows/QpiddBroker.cpp b/qpid/cpp/src/windows/QpiddBroker.cpp
index 42ba97bdb1..e73fcf0af5 100644
--- a/qpid/cpp/src/windows/QpiddBroker.cpp
+++ b/qpid/cpp/src/windows/QpiddBroker.cpp
@@ -32,7 +32,8 @@
#include <iostream>
#include <windows.h>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
BootstrapOptions::BootstrapOptions(const char* argv0)
: qpid::Options("Options"),
@@ -451,6 +452,7 @@ int QpiddBroker::execute (QpiddOptions *options) {
return 0;
}
+}} // namespace qpid::broker
int main(int argc, char* argv[])
{
@@ -459,13 +461,13 @@ int main(int argc, char* argv[])
// the service is stopped.
SERVICE_TABLE_ENTRY dispatchTable[] =
{
- { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain },
+ { "", (LPSERVICE_MAIN_FUNCTION)qpid::broker::ServiceMain },
{ NULL, NULL }
};
if (!StartServiceCtrlDispatcher(dispatchTable)) {
DWORD err = ::GetLastError();
if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console
- return run_broker(argc, argv);
+ return qpid::broker::run_broker(argc, argv);
throw QPID_WINDOWS_ERROR(err);
}
return 0;
diff --git a/qpid/cpp/src/windows/SCM.cpp b/qpid/cpp/src/windows/SCM.cpp
index 232bb04c17..2eeb143427 100644
--- a/qpid/cpp/src/windows/SCM.cpp
+++ b/qpid/cpp/src/windows/SCM.cpp
@@ -1,332 +1,332 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/log/Statement.h"
-#include "qpid/sys/windows/check.h"
-#include "SCM.h"
-
-#pragma comment(lib, "advapi32.lib")
-
-namespace {
-
-// Container that will close a SC_HANDLE upon destruction.
-class AutoServiceHandle {
-public:
- AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {}
- ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); }
- void release() { h = NULL; }
- void reset(SC_HANDLE newHandle)
- {
- if (h != NULL)
- ::CloseServiceHandle(h);
- h = newHandle;
- }
- operator SC_HANDLE() const { return h; }
-
-private:
- SC_HANDLE h;
-};
-
-}
-
-namespace qpid {
-namespace windows {
-
-SCM::SCM() : scmHandle(NULL)
-{
-}
-
-SCM::~SCM()
-{
- if (NULL != scmHandle)
- ::CloseServiceHandle(scmHandle);
-}
-
-/**
- * Install this executable as a service
- */
-void SCM::install(const string& serviceName,
- const string& serviceDesc,
- const string& args,
- DWORD startType,
- const string& account,
- const string& password,
- const string& depends)
-{
- // Handle dependent service name list; Windows wants a set of nul-separated
- // names ending with a double nul.
- string depends2 = depends;
- if (!depends2.empty()) {
- // CDL to null delimiter w/ trailing double null
- size_t p = 0;
- while ((p = depends2.find_first_of( ',', p)) != string::npos)
- depends2.replace(p, 1, 1, '\0');
- depends2.push_back('\0');
- depends2.push_back('\0');
- }
-
-#if 0
- // I'm nervous about adding a user/password check here. Is this a
- // potential attack vector, letting users check passwords without
- // control? -Steve Huston, Feb 24, 2011
-
- // Validate account, password
- HANDLE hToken = NULL;
- bool logStatus = false;
- if (!account.empty() && !password.empty() &&
- !(logStatus = ::LogonUserA(account.c_str(),
- "",
- password.c_str(),
- LOGON32_LOGON_NETWORK,
- LOGON32_PROVIDER_DEFAULT,
- &hToken ) != 0))
- std::cout << "warning: supplied account & password failed with LogonUser." << std::endl;
- if (logStatus)
- ::CloseHandle(hToken);
-#endif
-
- // Get fully qualified .exe name
- char myPath[MAX_PATH];
- DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH);
- QPID_WINDOWS_CHECK_NOT(myPathLength, 0);
- string imagePath(myPath, myPathLength);
- if (!args.empty())
- imagePath += " " + args;
-
- // Ensure there's a handle to the SCM database.
- openSvcManager();
-
- // Create the service
- SC_HANDLE svcHandle;
- svcHandle = ::CreateService(scmHandle, // SCM database
- serviceName.c_str(), // name of service
- serviceDesc.c_str(), // name to display
- SERVICE_ALL_ACCESS, // desired access
- SERVICE_WIN32_OWN_PROCESS, // service type
- startType, // start type
- SERVICE_ERROR_NORMAL, // error cntrl type
- imagePath.c_str(), // path to service's binary w/ optional arguments
- NULL, // no load ordering group
- NULL, // no tag identifier
- depends2.empty() ? NULL : depends2.c_str(),
- account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem
- password.empty() ? NULL : password.c_str()); // password, or NULL for none
- QPID_WINDOWS_CHECK_NULL(svcHandle);
- ::CloseServiceHandle(svcHandle);
- QPID_LOG(info, "Service installed successfully");
-}
-
-/**
- *
- */
-void SCM::uninstall(const string& serviceName)
-{
- // Ensure there's a handle to the SCM database.
- openSvcManager();
- AutoServiceHandle svc(::OpenService(scmHandle,
- serviceName.c_str(),
- DELETE));
- QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc);
- QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0);
- QPID_LOG(info, "Service deleted successfully.");
-}
-
-/**
- * Attempt to start the service.
- */
-void SCM::start(const string& serviceName)
-{
- // Ensure we have a handle to the SCM database.
- openSvcManager();
-
- // Get a handle to the service.
- AutoServiceHandle svc(::OpenService(scmHandle,
- serviceName.c_str(),
- SERVICE_ALL_ACCESS));
- QPID_WINDOWS_CHECK_NULL(svc);
-
- // Check the status in case the service is not stopped.
- DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
- if (state == SERVICE_STOP_PENDING)
- throw qpid::Exception("Timed out waiting for running service to stop.");
-
- // Attempt to start the service.
- QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0);
-
- QPID_LOG(info, "Service start pending...");
-
- // Check the status until the service is no longer start pending.
- state = waitForStateChangeFrom(svc, SERVICE_START_PENDING);
- // Determine whether the service is running.
- if (state == SERVICE_RUNNING) {
- QPID_LOG(info, "Service started successfully");
- }
- else {
- throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state));
- }
-}
-
-/**
- *
- */
-void SCM::stop(const string& serviceName)
-{
- // Ensure a handle to the SCM database.
- openSvcManager();
-
- // Get a handle to the service.
- AutoServiceHandle svc(::OpenService(scmHandle,
- serviceName.c_str(),
- SERVICE_STOP | SERVICE_QUERY_STATUS |
- SERVICE_ENUMERATE_DEPENDENTS));
- QPID_WINDOWS_CHECK_NULL(svc);
-
- // Make sure the service is not already stopped; if it's stop-pending,
- // wait for it to finalize.
- DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
- if (state == SERVICE_STOPPED) {
- QPID_LOG(info, "Service is already stopped");
- return;
- }
-
- // If the service is running, dependencies must be stopped first.
- std::auto_ptr<ENUM_SERVICE_STATUS> deps;
- DWORD numDeps = getDependentServices(svc, deps);
- for (DWORD i = 0; i < numDeps; i++)
- stop(deps.get()[i].lpServiceName);
-
- // Dependents stopped; send a stop code to the service.
- SERVICE_STATUS_PROCESS ssp;
- if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp))
- throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " <<
- qpid::sys::strError(::GetLastError())));
-
- // Wait for the service to stop.
- state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
- if (state == SERVICE_STOPPED)
- QPID_LOG(info, QPID_MSG("Service " << serviceName <<
- " stopped successfully."));
-}
-
-/**
- *
- */
-void SCM::openSvcManager()
-{
- if (NULL != scmHandle)
- return;
-
- scmHandle = ::OpenSCManager(NULL, // local computer
- NULL, // ServicesActive database
- SC_MANAGER_ALL_ACCESS); // Rights
- QPID_WINDOWS_CHECK_NULL(scmHandle);
-}
-
-DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState)
-{
- SERVICE_STATUS_PROCESS ssStatus;
- DWORD bytesNeeded;
- DWORD waitTime;
- if (!::QueryServiceStatusEx(svc, // handle to service
- SC_STATUS_PROCESS_INFO, // information level
- (LPBYTE)&ssStatus, // address of structure
- sizeof(ssStatus), // size of structure
- &bytesNeeded)) // size needed if buffer is too small
- throw QPID_WINDOWS_ERROR(::GetLastError());
-
- // Save the tick count and initial checkpoint.
- DWORD startTickCount = ::GetTickCount();
- DWORD oldCheckPoint = ssStatus.dwCheckPoint;
-
- // Wait for the service to change out of the noted state.
- while (ssStatus.dwCurrentState == originalState) {
- // Do not wait longer than the wait hint. A good interval is
- // one-tenth of the wait hint but not less than 1 second
- // and not more than 10 seconds.
- waitTime = ssStatus.dwWaitHint / 10;
- if (waitTime < 1000)
- waitTime = 1000;
- else if (waitTime > 10000)
- waitTime = 10000;
-
- ::Sleep(waitTime);
-
- // Check the status until the service is no longer stop pending.
- if (!::QueryServiceStatusEx(svc,
- SC_STATUS_PROCESS_INFO,
- (LPBYTE) &ssStatus,
- sizeof(ssStatus),
- &bytesNeeded))
- throw QPID_WINDOWS_ERROR(::GetLastError());
-
- if (ssStatus.dwCheckPoint > oldCheckPoint) {
- // Continue to wait and check.
- startTickCount = ::GetTickCount();
- oldCheckPoint = ssStatus.dwCheckPoint;
- } else {
- if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint)
- break;
- }
- }
- return ssStatus.dwCurrentState;
-}
-
-/**
- * Get the services that depend on @arg svc. All dependent service info
- * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps.
- *
- * @retval The number of dependent services.
- */
-DWORD SCM::getDependentServices(SC_HANDLE svc,
- std::auto_ptr<ENUM_SERVICE_STATUS>& deps)
-{
- DWORD bytesNeeded;
- DWORD numEntries;
-
- // Pass a zero-length buffer to get the required buffer size.
- if (::EnumDependentServices(svc,
- SERVICE_ACTIVE,
- 0,
- 0,
- &bytesNeeded,
- &numEntries)) {
- // If the Enum call succeeds, then there are no dependent
- // services, so do nothing.
- return 0;
- }
-
- if (::GetLastError() != ERROR_MORE_DATA)
- throw QPID_WINDOWS_ERROR((::GetLastError()));
-
- // Allocate a buffer for the dependencies.
- deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded]));
- // Enumerate the dependencies.
- if (!::EnumDependentServices(svc,
- SERVICE_ACTIVE,
- deps.get(),
- bytesNeeded,
- &bytesNeeded,
- &numEntries))
- throw QPID_WINDOWS_ERROR((::GetLastError()));
- return numEntries;
-}
-
-} } // namespace qpid::windows
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/sys/windows/check.h"
+#include "SCM.h"
+
+#pragma comment(lib, "advapi32.lib")
+
+namespace qpid {
+namespace windows {
+
+namespace {
+
+// Container that will close a SC_HANDLE upon destruction.
+class AutoServiceHandle {
+public:
+ AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {}
+ ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); }
+ void release() { h = NULL; }
+ void reset(SC_HANDLE newHandle)
+ {
+ if (h != NULL)
+ ::CloseServiceHandle(h);
+ h = newHandle;
+ }
+ operator SC_HANDLE() const { return h; }
+
+private:
+ SC_HANDLE h;
+};
+
+}
+
+SCM::SCM() : scmHandle(NULL)
+{
+}
+
+SCM::~SCM()
+{
+ if (NULL != scmHandle)
+ ::CloseServiceHandle(scmHandle);
+}
+
+/**
+ * Install this executable as a service
+ */
+void SCM::install(const string& serviceName,
+ const string& serviceDesc,
+ const string& args,
+ DWORD startType,
+ const string& account,
+ const string& password,
+ const string& depends)
+{
+ // Handle dependent service name list; Windows wants a set of nul-separated
+ // names ending with a double nul.
+ string depends2 = depends;
+ if (!depends2.empty()) {
+ // CDL to null delimiter w/ trailing double null
+ size_t p = 0;
+ while ((p = depends2.find_first_of( ',', p)) != string::npos)
+ depends2.replace(p, 1, 1, '\0');
+ depends2.push_back('\0');
+ depends2.push_back('\0');
+ }
+
+#if 0
+ // I'm nervous about adding a user/password check here. Is this a
+ // potential attack vector, letting users check passwords without
+ // control? -Steve Huston, Feb 24, 2011
+
+ // Validate account, password
+ HANDLE hToken = NULL;
+ bool logStatus = false;
+ if (!account.empty() && !password.empty() &&
+ !(logStatus = ::LogonUserA(account.c_str(),
+ "",
+ password.c_str(),
+ LOGON32_LOGON_NETWORK,
+ LOGON32_PROVIDER_DEFAULT,
+ &hToken ) != 0))
+ std::cout << "warning: supplied account & password failed with LogonUser." << std::endl;
+ if (logStatus)
+ ::CloseHandle(hToken);
+#endif
+
+ // Get fully qualified .exe name
+ char myPath[MAX_PATH];
+ DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH);
+ QPID_WINDOWS_CHECK_NOT(myPathLength, 0);
+ string imagePath(myPath, myPathLength);
+ if (!args.empty())
+ imagePath += " " + args;
+
+ // Ensure there's a handle to the SCM database.
+ openSvcManager();
+
+ // Create the service
+ SC_HANDLE svcHandle;
+ svcHandle = ::CreateService(scmHandle, // SCM database
+ serviceName.c_str(), // name of service
+ serviceDesc.c_str(), // name to display
+ SERVICE_ALL_ACCESS, // desired access
+ SERVICE_WIN32_OWN_PROCESS, // service type
+ startType, // start type
+ SERVICE_ERROR_NORMAL, // error cntrl type
+ imagePath.c_str(), // path to service's binary w/ optional arguments
+ NULL, // no load ordering group
+ NULL, // no tag identifier
+ depends2.empty() ? NULL : depends2.c_str(),
+ account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem
+ password.empty() ? NULL : password.c_str()); // password, or NULL for none
+ QPID_WINDOWS_CHECK_NULL(svcHandle);
+ ::CloseServiceHandle(svcHandle);
+ QPID_LOG(info, "Service installed successfully");
+}
+
+/**
+ *
+ */
+void SCM::uninstall(const string& serviceName)
+{
+ // Ensure there's a handle to the SCM database.
+ openSvcManager();
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ DELETE));
+ QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc);
+ QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0);
+ QPID_LOG(info, "Service deleted successfully.");
+}
+
+/**
+ * Attempt to start the service.
+ */
+void SCM::start(const string& serviceName)
+{
+ // Ensure we have a handle to the SCM database.
+ openSvcManager();
+
+ // Get a handle to the service.
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ SERVICE_ALL_ACCESS));
+ QPID_WINDOWS_CHECK_NULL(svc);
+
+ // Check the status in case the service is not stopped.
+ DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOP_PENDING)
+ throw qpid::Exception("Timed out waiting for running service to stop.");
+
+ // Attempt to start the service.
+ QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0);
+
+ QPID_LOG(info, "Service start pending...");
+
+ // Check the status until the service is no longer start pending.
+ state = waitForStateChangeFrom(svc, SERVICE_START_PENDING);
+ // Determine whether the service is running.
+ if (state == SERVICE_RUNNING) {
+ QPID_LOG(info, "Service started successfully");
+ }
+ else {
+ throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state));
+ }
+}
+
+/**
+ *
+ */
+void SCM::stop(const string& serviceName)
+{
+ // Ensure a handle to the SCM database.
+ openSvcManager();
+
+ // Get a handle to the service.
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ SERVICE_STOP | SERVICE_QUERY_STATUS |
+ SERVICE_ENUMERATE_DEPENDENTS));
+ QPID_WINDOWS_CHECK_NULL(svc);
+
+ // Make sure the service is not already stopped; if it's stop-pending,
+ // wait for it to finalize.
+ DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOPPED) {
+ QPID_LOG(info, "Service is already stopped");
+ return;
+ }
+
+ // If the service is running, dependencies must be stopped first.
+ std::auto_ptr<ENUM_SERVICE_STATUS> deps;
+ DWORD numDeps = getDependentServices(svc, deps);
+ for (DWORD i = 0; i < numDeps; i++)
+ stop(deps.get()[i].lpServiceName);
+
+ // Dependents stopped; send a stop code to the service.
+ SERVICE_STATUS_PROCESS ssp;
+ if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp))
+ throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " <<
+ qpid::sys::strError(::GetLastError())));
+
+ // Wait for the service to stop.
+ state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOPPED)
+ QPID_LOG(info, QPID_MSG("Service " << serviceName <<
+ " stopped successfully."));
+}
+
+/**
+ *
+ */
+void SCM::openSvcManager()
+{
+ if (NULL != scmHandle)
+ return;
+
+ scmHandle = ::OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // Rights
+ QPID_WINDOWS_CHECK_NULL(scmHandle);
+}
+
+DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState)
+{
+ SERVICE_STATUS_PROCESS ssStatus;
+ DWORD bytesNeeded;
+ DWORD waitTime;
+ if (!::QueryServiceStatusEx(svc, // handle to service
+ SC_STATUS_PROCESS_INFO, // information level
+ (LPBYTE)&ssStatus, // address of structure
+ sizeof(ssStatus), // size of structure
+ &bytesNeeded)) // size needed if buffer is too small
+ throw QPID_WINDOWS_ERROR(::GetLastError());
+
+ // Save the tick count and initial checkpoint.
+ DWORD startTickCount = ::GetTickCount();
+ DWORD oldCheckPoint = ssStatus.dwCheckPoint;
+
+ // Wait for the service to change out of the noted state.
+ while (ssStatus.dwCurrentState == originalState) {
+ // Do not wait longer than the wait hint. A good interval is
+ // one-tenth of the wait hint but not less than 1 second
+ // and not more than 10 seconds.
+ waitTime = ssStatus.dwWaitHint / 10;
+ if (waitTime < 1000)
+ waitTime = 1000;
+ else if (waitTime > 10000)
+ waitTime = 10000;
+
+ ::Sleep(waitTime);
+
+ // Check the status until the service is no longer stop pending.
+ if (!::QueryServiceStatusEx(svc,
+ SC_STATUS_PROCESS_INFO,
+ (LPBYTE) &ssStatus,
+ sizeof(ssStatus),
+ &bytesNeeded))
+ throw QPID_WINDOWS_ERROR(::GetLastError());
+
+ if (ssStatus.dwCheckPoint > oldCheckPoint) {
+ // Continue to wait and check.
+ startTickCount = ::GetTickCount();
+ oldCheckPoint = ssStatus.dwCheckPoint;
+ } else {
+ if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint)
+ break;
+ }
+ }
+ return ssStatus.dwCurrentState;
+}
+
+/**
+ * Get the services that depend on @arg svc. All dependent service info
+ * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps.
+ *
+ * @retval The number of dependent services.
+ */
+DWORD SCM::getDependentServices(SC_HANDLE svc,
+ std::auto_ptr<ENUM_SERVICE_STATUS>& deps)
+{
+ DWORD bytesNeeded;
+ DWORD numEntries;
+
+ // Pass a zero-length buffer to get the required buffer size.
+ if (::EnumDependentServices(svc,
+ SERVICE_ACTIVE,
+ 0,
+ 0,
+ &bytesNeeded,
+ &numEntries)) {
+ // If the Enum call succeeds, then there are no dependent
+ // services, so do nothing.
+ return 0;
+ }
+
+ if (::GetLastError() != ERROR_MORE_DATA)
+ throw QPID_WINDOWS_ERROR((::GetLastError()));
+
+ // Allocate a buffer for the dependencies.
+ deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded]));
+ // Enumerate the dependencies.
+ if (!::EnumDependentServices(svc,
+ SERVICE_ACTIVE,
+ deps.get(),
+ bytesNeeded,
+ &bytesNeeded,
+ &numEntries))
+ throw QPID_WINDOWS_ERROR((::GetLastError()));
+ return numEntries;
+}
+
+} } // namespace qpid::windows
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index fe0ca6362b..ec9eacb169 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -35,6 +35,7 @@ geronimo-kernel=lib/geronimo-kernel-2.2.1.jar
geronimo-openejb=lib/geronimo-ejb_3.0_spec-1.0.1.jar
junit=lib/junit-3.8.1.jar
+mockito-all=lib/mockito-all-1.9.0.jar
log4j=lib/log4j-1.2.12.jar
@@ -58,7 +59,7 @@ broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs}
-junit-toolkit.libs=${log4j} ${junit} ${slf4j-api}
+junit-toolkit.libs=${log4j} ${junit} ${slf4j-api} ${mockito-all}
test.libs=${slf4j-log4j} ${junit-toolkit.libs}
ibm-icu=lib/com.ibm.icu_3.8.1.v20080530.jar
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 56ee56d178..a1a06c5547 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -327,6 +327,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
+ _conn.setClosed();
+
ExceptionListener listener = _conn.getExceptionListenerNoCheck();
if (listener == null)
{
diff --git a/qpid/java/lib/mockito-all-1.9.0.jar b/qpid/java/lib/mockito-all-1.9.0.jar
new file mode 100644
index 0000000000..273fd50feb
--- /dev/null
+++ b/qpid/java/lib/mockito-all-1.9.0.jar
Binary files differ
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
index 6b83929258..5b3bca7033 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.management.jmx.ManagedConnectionMBeanTest;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.transport.ConnectionException;
@@ -62,10 +63,13 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase
{
final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class;
+ assertConnectionOpen();
+
stopBroker();
JMSException exception = _recordingExceptionListener.awaitException(10000);
assertConnectionCloseWasReported(exception, expectedLinkedException);
+ assertConnectionClosed();
ensureCanCloseWithoutException();
}
@@ -79,10 +83,13 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase
return;
}
+ assertConnectionOpen();
+
killBroker();
JMSException exception = _recordingExceptionListener.awaitException(10000);
assertConnectionCloseWasReported(exception, expectedLinkedException);
+ assertConnectionClosed();
ensureCanCloseWithoutException();
}
@@ -107,6 +114,16 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase
assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass());
}
+ private void assertConnectionClosed()
+ {
+ assertTrue("Connection should be marked as closed", ((AMQConnection)_connection).isClosed());
+ }
+
+ private void assertConnectionOpen()
+ {
+ assertFalse("Connection should not be marked as closed", ((AMQConnection)_connection).isClosed());
+ }
+
private final class RecordingExceptionListener implements ExceptionListener
{
private final CountDownLatch _exceptionReceivedLatch = new CountDownLatch(1);
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 18fb0a61a3..ace7611a2f 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base):
snd.close()
+ def test_ttl_expire(self):
+ """ Verify that expired (TTL) group messages are skipped correctly
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","C","A","B","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ if m.properties['THE-GROUP'] == 'B':
+ m.ttl = 1;
+ snd.send(m)
+
+ sleep(2) # let all B's expire
+
+ # create consumers on separate sessions: C1,C2
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+ # C1 should acquire A-0, then C2 should acquire C-2, Group B should
+ # expire and never be fetched
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'C'
+ assert m2.content['index'] == 2
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 3
+
+ m2 = c2.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'C'
+ assert m2.content['index'] == 5
+
+ # there should be no more left for either consumer
+ try:
+ mx = c1.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+ try:
+ mx = c2.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ c1.session.acknowledge()
+ c2.session.acknowledge()
+ c1.close()
+ c2.close()
+ snd.close()
+
+
class StickyConsumerMsgGroupTests(Base):
"""
Tests for the behavior of sticky-consumer message groups. These tests
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index 896ae89faf..1308df765d 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -481,7 +481,7 @@ class BrokerManager:
if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY],
if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
- print "--alternate-exchange=%s" % q._altExchange_.name,
+ print "--alternate-exchange=%s" % q.altExchange,
if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],