From 3c2954782cc6d27bacf8865cfaea9c71c2bfec2b Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Thu, 22 Mar 2012 13:39:14 +0000 Subject: QPID-3890: resync this branch to latest trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1303774 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/posix/QpiddBroker.cpp | 13 +- qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 10 +- qpid/cpp/src/qpid/broker/Exchange.cpp | 6 +- qpid/cpp/src/qpid/broker/Message.cpp | 15 +- qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 84 ++- qpid/cpp/src/qpid/broker/MessageGroupManager.h | 16 +- qpid/cpp/src/qpid/broker/Queue.cpp | 8 +- qpid/cpp/src/qpid/cluster/ClusterTimer.cpp | 2 + qpid/cpp/src/qpid/log/posix/SinkOptions.cpp | 8 +- qpid/cpp/src/qpid/management/ManagementAgent.cpp | 10 +- qpid/cpp/src/qpid/store/MessageStorePlugin.cpp | 6 +- qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp | 8 +- qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp | 8 +- qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 8 +- qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp | 5 - qpid/cpp/src/qpid/sys/ssl/SslIo.cpp | 7 +- qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 25 +- .../cpp/src/qpid/sys/windows/PollableCondition.cpp | 5 - qpid/cpp/src/qpid/sys/windows/Socket.cpp | 10 +- qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp | 8 +- qpid/cpp/src/qpidd.cpp | 4 + qpid/cpp/src/qpidd.h | 4 + qpid/cpp/src/tests/RefCounted.cpp | 6 +- qpid/cpp/src/tests/StringUtils.cpp | 6 +- qpid/cpp/src/tests/cli_tests.py | 3 + qpid/cpp/src/tests/qpid-ping.cpp | 27 +- qpid/cpp/src/tests/qpid-send.cpp | 39 +- qpid/cpp/src/tests/testagent.cpp | 11 +- .../src/tests/windows/DisableWin32ErrorWindows.cpp | 6 +- qpid/cpp/src/windows/QpiddBroker.cpp | 8 +- qpid/cpp/src/windows/SCM.cpp | 664 ++++++++++----------- qpid/java/build.deps | 3 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 2 + qpid/java/lib/mockito-all-1.9.0.jar | Bin 0 -> 1495219 bytes .../BrokerClosesClientConnectionTest.java | 17 + .../src/py/qpid_tests/broker_0_10/msg_groups.py | 64 ++ qpid/tools/src/py/qpid-config | 2 +- 37 files changed, 663 insertions(+), 465 deletions(-) create mode 100644 qpid/java/lib/mockito-all-1.9.0.jar diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index 1cebcfc3ac..694751c27c 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -31,10 +31,11 @@ #include #include -using namespace std; -using namespace qpid; -using qpid::broker::Broker; -using qpid::broker::Daemon; +using std::cout; +using std::endl; + +namespace qpid { +namespace broker { BootstrapOptions::BootstrapOptions(const char* argv0) : qpid::Options("Options"), @@ -197,7 +198,9 @@ int QpiddBroker::execute (QpiddOptions *options) { return 0; } +}} // namespace qpid::Broker + int main(int argc, char* argv[]) { - return run_broker(argc, argv); + return qpid::broker::run_broker(argc, argv); } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index f183ff8e0c..3f854c7510 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -31,9 +31,11 @@ #include #include +namespace qpid { +namespace management { + using namespace qpid::client; using namespace qpid::framing; -using namespace qpid::management; using namespace qpid::sys; using namespace std; using std::stringstream; @@ -1260,7 +1262,7 @@ void ManagementAgentImpl::ConnectionThread::run() int totalSleep = 0; do { sys::Mutex::ScopedUnlock _unlock(connLock); - ::sleep(delayMin); + qpid::sys::sleep(delayMin); totalSleep += delayMin; } while (totalSleep < delay && !shutdown); sleeping = false; @@ -1396,8 +1398,10 @@ void ManagementAgentImpl::PublishThread::run() sleepTime = 1; while (totalSleep < agent.getInterval() && !shutdown) { - ::sleep(sleepTime); + qpid::sys::sleep(sleepTime); totalSleep += sleepTime; } } } + +}} \ No newline at end of file diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index f311b79578..ad688ba314 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -32,7 +32,9 @@ #include "qpid/sys/ExceptionHolder.h" #include -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; @@ -408,3 +410,5 @@ bool Exchange::routeWithAlternate(Deliverable& msg) } return msg.delivered; } + +}} \ No newline at end of file diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7093a68d6c..40dfba39f4 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -131,12 +131,10 @@ uint32_t Message::getRequiredCredit() void Message::encode(framing::Buffer& buffer) const { - { - sys::Mutex::ScopedLock l(lock); // prevent header modifications while encoding - //encode method and header frames - EncodeFrame f1(buffer); - frames.map_if(f1, TypeFilter2()); - } + sys::Mutex::ScopedLock l(lock); + //encode method and header frames + EncodeFrame f1(buffer); + frames.map_if(f1, TypeFilter2()); //then encode the payload of each content frame framing::EncodeBody f2(buffer); @@ -145,6 +143,7 @@ void Message::encode(framing::Buffer& buffer) const void Message::encodeContent(framing::Buffer& buffer) const { + sys::Mutex::ScopedLock l(lock); //encode the payload of each content frame EncodeBody f2(buffer); frames.map_if(f2, TypeFilter()); @@ -157,6 +156,7 @@ uint32_t Message::encodedSize() const uint32_t Message::encodedContentSize() const { + sys::Mutex::ScopedLock l(lock); return frames.getContentSize(); } @@ -222,8 +222,9 @@ void Message::releaseContent() store->stage(pmsg); staged = true; } - //ensure required credit is cached before content frames are released + //ensure required credit and size is cached before content frames are released getRequiredCredit(); + contentSize(); //remove any content frames from the frameset frames.remove(TypeFilter()); setContentReleased(); diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 5f450cd556..22253532cb 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -43,9 +43,18 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +/** return an iterator to the message at position, or members.end() if not found */ +MessageGroupManager::GroupState::MessageFifo::iterator +MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position) +{ + MessageState mState(position); + MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState); + return (found->position == position) ? found : members.end(); +} + void MessageGroupManager::unFree( const GroupState& state ) { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + GroupFifo::iterator pos = freeGroups.find( state.members.front().position ); assert( pos != freeGroups.end() && pos->second == &state ); freeGroups.erase( pos ); } @@ -60,8 +69,8 @@ void MessageGroupManager::disown( GroupState& state ) { state.owner.clear(); assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; + assert(freeGroups.find(state.members.front().position) == freeGroups.end()); + freeGroups[state.members.front().position] = &state; } MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) @@ -106,7 +115,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm ) // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - state.members.push_back(qm.position); + GroupState::MessageState mState(qm.position); + state.members.push_back(mState); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << state.group << " total=" << total ); @@ -123,7 +133,9 @@ void MessageGroupManager::acquired( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert(state.members.size()); // there are msgs present + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + m->acquired = true; state.acquired += 1; QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); @@ -137,6 +149,9 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) GroupState& state = findGroup(qm); assert( state.acquired != 0 ); state.acquired -= 1; + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + m->acquired = false; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); @@ -152,13 +167,17 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert( state.members.size() != 0 ); - assert( state.acquired != 0 ); - state.acquired -= 1; + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + if (m->acquired) { + assert( state.acquired != 0 ); + state.acquired -= 1; + } - // likely to be at or near begin() if dequeued in order + // special case if qm is first (oldest) message in the group: + // may need to re-insert it back on the freeGroups list, as the index will change bool reFreeNeeded = false; - if (state.members.front() == qm.position) { + if (m == state.members.begin()) { if (!state.owned()) { // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! // if on freelist, it is indexed by first member, which is about to be removed! @@ -167,15 +186,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) } state.members.pop_front(); } else { - GroupState::PositionFifo::iterator pos = state.members.begin() + 1; - GroupState::PositionFifo::iterator end = state.members.end(); - while (pos != end) { - if (*pos == qm.position) { - state.members.erase(pos); - break; - } - ++pos; - } + state.members.erase(m); } uint32_t total = state.members.size(); @@ -220,11 +231,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued GroupState& group = findGroup(next); if (!group.owned()) { //TODO: make acquire more efficient when we already have the message in question - if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! + if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group - << "'s head message still pending. pos=" << group.members.front()); + << "'s head message still pending. pos=" << group.members.front().position); } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { return true; } @@ -284,7 +295,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const info[GROUP_TIMESTAMP] = 0; if (g->second.members.size() != 0) { QueuedMessage qm; - if (messages.find(g->second.members.front(), qm) && + if (messages.find(g->second.members.front().position, qm) && qm.payload && qm.payload->hasProperties()) { info[GROUP_TIMESTAMP] = qm.payload->getProperties()->getTimestamp(); @@ -353,6 +364,7 @@ namespace { const std::string GROUP_OWNER("owner"); const std::string GROUP_ACQUIRED_CT("acquired-ct"); const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_ACQUIRED_MSGS("acquired-msgs"); const std::string GROUP_STATE("group-state"); } @@ -371,10 +383,14 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const group.setString(GROUP_OWNER, g->second.owner); group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); framing::Array positions(TYPE_CODE_UINT32); - for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) - positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); + for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) { + positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); + acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); + } group.setArray(GROUP_POSITIONS, positions); + group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); } state.setArray(GROUP_STATE, groupState); @@ -425,13 +441,25 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) qName << "\": position encoding error!"); return; } + framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); + ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); + if (!ok || positions.count() != acquiredMsgs.count()) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": acquired flag encoding error!"); + return; + } + + Array::const_iterator a = acquiredMsgs.begin(); + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { + GroupState::MessageState mState((*p)->getIntegerValue()); + mState.acquired = (*a++)->getIntegerValue(); + state.members.push_back(mState); + } - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) - state.members.push_back((*p)->getIntegerValue()); messageGroups[state.group] = state; if (!state.owned()) { assert(state.members.size()); - freeGroups[state.members.front()] = &messageGroups[state.group]; + freeGroups[state.members.front().position] = &messageGroups[state.group]; } } diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..340ebbc56a 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -45,19 +45,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu struct GroupState { // note: update getState()/setState() when changing this object's state implementation - typedef std::deque PositionFifo; + + // track which messages are in this group, and if they have been acquired + struct MessageState { + qpid::framing::SequenceNumber position; + bool acquired; + MessageState() : acquired(false) {} + MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {} + bool operator<(const MessageState& b) const { return position < b.position; } + }; + typedef std::deque MessageFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - PositionFifo members; // msgs belonging to this group + MessageFifo members; // msgs belonging to this group, in enqueue order GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} + MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &); }; typedef sys::unordered_map GroupMap; - typedef std::map GroupFifo; + typedef std::map GroupFifo; GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3bd9233791..fdd95ae3bd 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -56,7 +56,9 @@ #include -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; @@ -1469,7 +1471,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Queue::shared_ptr queue; AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {} void fire() { @@ -1824,3 +1826,5 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + +}} \ No newline at end of file diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp index b4f7d00f38..90e4fa9d4d 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/ClusterTimerDropBody.h" +#include "qpid/sys/ClusterSafe.h" namespace qpid { namespace cluster { @@ -107,6 +108,7 @@ void ClusterTimer::drop(intrusive_ptr t) { // Deliver thread void ClusterTimer::deliverWakeup(const std::string& name) { QPID_LOG(trace, "Cluster timer wakeup delivered for " << name); + qpid::sys::assertClusterSafe(); Map::iterator i = map.find(name); if (i == map.end()) throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name)); diff --git a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp index ffa7633e3b..8459938e5c 100644 --- a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp +++ b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp @@ -30,6 +30,10 @@ using std::string; using qpid::Exception; +namespace qpid { +namespace log { +namespace posix { + namespace { // SyslogFacilities maps from syslog values to the text equivalents. @@ -110,10 +114,6 @@ std::string basename(const std::string path) { } // namespace -namespace qpid { -namespace log { -namespace posix { - std::ostream& operator<<(std::ostream& o, const SyslogFacility& f) { return o << SyslogFacilities().name(f.value); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 8c2cb95faa..cb07d5d047 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -44,14 +44,15 @@ #include #include +namespace qpid { +namespace management { + using boost::intrusive_ptr; using qpid::framing::Uuid; using qpid::types::Variant; using qpid::amqp_0_10::MapCodec; using qpid::amqp_0_10::ListCodec; -using qpid::sys::Mutex; using namespace qpid::framing; -using namespace qpid::management; using namespace qpid::broker; using namespace qpid; using namespace std; @@ -2961,9 +2962,6 @@ bool ManagementAgent::moveDeletedObjectsLH() { return !deleteList.empty(); } -namespace qpid { -namespace management { - namespace { QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; } @@ -2977,4 +2975,4 @@ const qpid::broker::ConnectionState* getManagementExecutionContext() return executionContext; } -}} +}} \ No newline at end of file diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp index 2a8d971987..20231bf910 100644 --- a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -28,6 +28,9 @@ #include "qpid/DataDir.h" #include "qpid/log/Statement.h" +namespace qpid { +namespace store { + /* * The MessageStore pointer given to the Broker points to static storage. * Thus, it cannot be deleted, especially by the broker. To prevent deletion, @@ -42,9 +45,6 @@ namespace { }; } -namespace qpid { -namespace store { - static MessageStorePlugin static_instance_registers_plugin; diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp index 14d63a4cd4..849a0a44e8 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp @@ -32,6 +32,10 @@ #include "MessageLog.h" #include "Lsn.h" +namespace qpid { +namespace store { +namespace ms_clfs { + namespace { // Structures that hold log records. Each has a type field at the start. @@ -97,10 +101,6 @@ struct MessageDequeue { } // namespace -namespace qpid { -namespace store { -namespace ms_clfs { - void MessageLog::initialize() { diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp index 04780e83e8..0ef046d7c8 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp @@ -33,6 +33,10 @@ #include "Transaction.h" #include "Lsn.h" +namespace qpid { +namespace store { +namespace ms_clfs { + namespace { // Structures that hold log records. Each has a type field at the start. @@ -95,10 +99,6 @@ struct TransactionDelete { } // namespace -namespace qpid { -namespace store { -namespace ms_clfs { - void TransactionLog::initialize() { diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index c25159985e..01ff8b6bfa 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -41,7 +41,9 @@ #include #include -using namespace qpid::sys; +namespace qpid { +namespace sys { +namespace posix { namespace { @@ -71,10 +73,6 @@ __thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms /* * Asynch Acceptor */ -namespace qpid { -namespace sys { -namespace posix { - class AsynchAcceptor : public qpid::sys::AsynchAcceptor { public: AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp index b22a615a54..abff8a5be8 100644 --- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -1,6 +1,3 @@ -#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP -#define QPID_SYS_LINUX_POLLABLECONDITION_CPP - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -120,5 +117,3 @@ void PollableCondition::set() { impl->set(); } void PollableCondition::clear() { impl->clear(); } }} // namespace qpid::sys - -#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp index 73f15617dc..2a7cf16923 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -37,8 +37,9 @@ #include -using namespace qpid::sys; -using namespace qpid::sys::ssl; +namespace qpid { +namespace sys { +namespace ssl { namespace { @@ -448,3 +449,5 @@ SecuritySettings SslIO::getSecuritySettings() { settings.authid = socket.getClientAuthId(); return settings; } + +}}} diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 30378d4c5f..fb8df5ddf8 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -295,6 +295,8 @@ private: volatile bool queuedDelete; // Socket close requested, but there are operations in progress. volatile bool queuedClose; + // Most recent asynch read request + volatile AsynchReadResult* pendingRead; private: // Dispatch events that have completed. @@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s, writeInProgress(false), queuedDelete(false), queuedClose(false), + pendingRead(0), working(false) { } @@ -504,6 +507,7 @@ void AsynchIO::startReading() { } } // On status 0 or WSA_IO_PENDING, completion will handle the rest. + pendingRead = result; } else { notifyBuffersEmpty(); @@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more if (readCallback) readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); + if (queuedClose && status == ERROR_OPERATION_ABORTED) { + return; // Expected reap from CancelIoEx + } notifyEof(); if (status != 0) { @@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) { { ScopedUnlock ul(completionLock); AsynchReadResult *r = dynamic_cast(result); - if (r != 0) + if (r != 0) { readComplete(r); + // Set pendingRead to 0 if it's still pointing to (newly completed) r + InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r); + } else { AsynchWriteResult *w = dynamic_cast(result); @@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) { else if (queuedDelete) delete this; } + else { + if (queuedClose && pendingRead) { + // Force outstanding read to completion. Layer above will + // call back. + CancelIoEx((HANDLE)toSocketHandle(socket), + ((AsynchReadResult *)pendingRead)->overlapped()); + pendingRead = 0; + } + } } } // namespace windows diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp index 6a1d9045b4..bb637be0a6 100644 --- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -1,6 +1,3 @@ -#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP -#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -110,5 +107,3 @@ void PollableCondition::clear() { } }} // namespace qpid::sys - -#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/ diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp index 1fa4768329..b085f67539 100644 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -32,6 +32,9 @@ #include +namespace qpid { +namespace sys { + // Need to initialize WinSock. Ideally, this would be a singleton or embedded // in some one-time initialization function. I tried boost singleton and could // not get it to compile (and others located in google had the same problem). @@ -76,13 +79,6 @@ protected: static WinSockSetup setup; -} /* namespace */ - -namespace qpid { -namespace sys { - -namespace { - std::string getName(SOCKET fd, bool local) { ::sockaddr_storage name_s; // big enough for any socket address diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp index 11a3389e45..25cc94b290 100644 --- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -38,6 +38,10 @@ #include #include +namespace qpid { +namespace sys { +namespace windows { + namespace { /* @@ -66,10 +70,6 @@ namespace { }; } -namespace qpid { -namespace sys { -namespace windows { - SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s, CredHandle hCred, ReadCallback rCb, diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index a0e329ca9d..b5686c6ab8 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -29,6 +29,9 @@ #include using namespace std; +namespace qpid { +namespace broker { + auto_ptr options; // Broker real entry; various system-invoked entrypoints call here. @@ -87,3 +90,4 @@ int run_broker(int argc, char *argv[], bool hidden) } return 1; } +}} diff --git a/qpid/cpp/src/qpidd.h b/qpid/cpp/src/qpidd.h index a3150a2737..f7f84d11da 100644 --- a/qpid/cpp/src/qpidd.h +++ b/qpid/cpp/src/qpidd.h @@ -26,6 +26,9 @@ #include +namespace qpid { +namespace broker { + // BootstrapOptions is a minimal subset of options used for a pre-parse // of the command line to discover which plugin modules need to be loaded. // The pre-parse is necessary because plugin modules may supply their own @@ -70,4 +73,5 @@ public: // Broker real entry; various system-invoked entrypoints call here. int run_broker(int argc, char *argv[], bool hidden = false); +}} #endif /*!QPID_H*/ diff --git a/qpid/cpp/src/tests/RefCounted.cpp b/qpid/cpp/src/tests/RefCounted.cpp index e4c1da5696..3ac3895322 100644 --- a/qpid/cpp/src/tests/RefCounted.cpp +++ b/qpid/cpp/src/tests/RefCounted.cpp @@ -21,15 +21,15 @@ #include "unit_test.h" +namespace qpid { +namespace tests { + QPID_AUTO_TEST_SUITE(RefCountedTestSuiteTestSuite) using boost::intrusive_ptr; using namespace std; using namespace qpid; -namespace qpid { -namespace tests { - struct CountMe : public RefCounted { static int instances; CountMe() { ++instances; } diff --git a/qpid/cpp/src/tests/StringUtils.cpp b/qpid/cpp/src/tests/StringUtils.cpp index 6a19119288..c50287a4f4 100644 --- a/qpid/cpp/src/tests/StringUtils.cpp +++ b/qpid/cpp/src/tests/StringUtils.cpp @@ -23,9 +23,11 @@ #include "unit_test.h" +namespace qpid { +namespace tests { + QPID_AUTO_TEST_SUITE(StringUtilsTestSuite) -using namespace qpid; using std::string; QPID_AUTO_TEST_CASE(testSplit_general) @@ -75,3 +77,5 @@ QPID_AUTO_TEST_CASE(testSplit_empty) } QPID_AUTO_TEST_SUITE_END() + +}} diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py index b9a7dda15c..7ac5b1deed 100755 --- a/qpid/cpp/src/tests/cli_tests.py +++ b/qpid/cpp/src/tests/cli_tests.py @@ -330,6 +330,9 @@ class CliTests(TestBase010): ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName))) self.assertEqual(ret, 0) + ret = os.system(self.qpid_config_command(" queues")) + self.assertEqual(ret, 0) + queues = self.broker_access.getAllQueues() found = False for queue in queues: diff --git a/qpid/cpp/src/tests/qpid-ping.cpp b/qpid/cpp/src/tests/qpid-ping.cpp index 0cb4afa0ee..52331499e7 100644 --- a/qpid/cpp/src/tests/qpid-ping.cpp +++ b/qpid/cpp/src/tests/qpid-ping.cpp @@ -32,11 +32,20 @@ #include #include -using namespace std; -using namespace qpid::sys; -using namespace qpid::framing; -using namespace qpid::client; -using namespace qpid; +using std::cerr; +using std::cout; +using std::endl; +using std::exception; +using std::string; +using namespace qpid::client::arg; // For keyword args +using qpid::client::AsyncSession; +using qpid::client::Connection; +using qpid::client::Message; +using qpid::client::SubscriptionManager; +using qpid::framing::Uuid; + +namespace qpid { +namespace tests { struct PingOptions : public qpid::TestOptions { int timeout; // Timeout in seconds. @@ -48,9 +57,11 @@ struct PingOptions : public qpid::TestOptions { } }; +}} // namespace qpid::tests + int main(int argc, char** argv) { try { - PingOptions opts; + qpid::tests::PingOptions opts; opts.parse(argc, argv); opts.con.heartbeat = (opts.timeout+1)/2; Connection connection; @@ -58,8 +69,8 @@ int main(int argc, char** argv) { if (!opts.quiet) cout << "Opened connection." << endl; AsyncSession s = connection.newSession(); string qname(Uuid(true).str()); - s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true); - s.messageTransfer(arg::content=Message("hello", qname)); + s.queueDeclare(queue=qname, autoDelete=true, exclusive=true); + s.messageTransfer(content=Message("hello", qname)); if (!opts.quiet) cout << "Sent message." << endl; SubscriptionManager subs(s); subs.get(qname); diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index b1213a484f..91eef0cd71 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -36,15 +36,26 @@ #include #include -using namespace std; -using namespace qpid::messaging; -using namespace qpid::types; - -typedef std::vector string_vector; +using std::string; +using std::ios_base; + +using qpid::messaging::Address; +using qpid::messaging::Connection; +using qpid::messaging::Duration; +using qpid::messaging::FailoverUpdates; +using qpid::messaging::Message; +using qpid::messaging::Receiver; +using qpid::messaging::Session; +using qpid::messaging::Sender; +using qpid::types::Exception; +using qpid::types::Uuid; +using qpid::types::Variant; namespace qpid { namespace tests { +typedef std::vector string_vector; + struct Options : public qpid::Options { bool help; @@ -223,10 +234,6 @@ const string EOS("eos"); const string SN("sn"); const string TS("ts"); -}} // namespace qpid::tests - -using namespace qpid::tests; - class ContentGenerator { public: virtual ~ContentGenerator() {} @@ -329,6 +336,20 @@ public: } }; +}} // namespace qpid::tests + +using qpid::tests::Options; +using qpid::tests::Reporter; +using qpid::tests::Throughput; +using qpid::tests::ContentGenerator; +using qpid::tests::GroupGenerator; +using qpid::tests::GetlineContentGenerator; +using qpid::tests::MapContentGenerator; +using qpid::tests::FixedContentGenerator; +using qpid::tests::SN; +using qpid::tests::TS; +using qpid::tests::EOS; + int main(int argc, char ** argv) { Connection connection; diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp index 98520b424a..e6010a8e00 100644 --- a/qpid/cpp/src/tests/testagent.cpp +++ b/qpid/cpp/src/tests/testagent.cpp @@ -36,9 +36,12 @@ #include +namespace qpid { +namespace tests { + static bool running = true; -using namespace std; +using std::string; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -191,12 +194,14 @@ int main_int(int argc, char** argv) return 0; } +}} // namespace qpid::tests + int main(int argc, char** argv) { try { - return main_int(argc, argv); + return qpid::tests::main_int(argc, argv); } catch(std::exception& e) { - cerr << "Top Level Exception: " << e.what() << endl; + std::cerr << "Top Level Exception: " << e.what() << std::endl; return 1; } } diff --git a/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp b/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp index 024f20b147..14f1e46606 100644 --- a/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp +++ b/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp @@ -32,7 +32,9 @@ #include #include -namespace { +namespace qpid { +namespace tests { +namespace windows { // Instead of popping up a window for exceptions, just print something out LONG _stdcall UnhandledExceptionFilter (PEXCEPTION_POINTERS pExceptionInfo) @@ -73,4 +75,4 @@ redirect_errors_to_stderr::redirect_errors_to_stderr() SetUnhandledExceptionFilter (&UnhandledExceptionFilter); } -} // namespace +}}} // namespace diff --git a/qpid/cpp/src/windows/QpiddBroker.cpp b/qpid/cpp/src/windows/QpiddBroker.cpp index 42ba97bdb1..e73fcf0af5 100644 --- a/qpid/cpp/src/windows/QpiddBroker.cpp +++ b/qpid/cpp/src/windows/QpiddBroker.cpp @@ -32,7 +32,8 @@ #include #include -using namespace qpid::broker; +namespace qpid { +namespace broker { BootstrapOptions::BootstrapOptions(const char* argv0) : qpid::Options("Options"), @@ -451,6 +452,7 @@ int QpiddBroker::execute (QpiddOptions *options) { return 0; } +}} // namespace qpid::broker int main(int argc, char* argv[]) { @@ -459,13 +461,13 @@ int main(int argc, char* argv[]) // the service is stopped. SERVICE_TABLE_ENTRY dispatchTable[] = { - { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain }, + { "", (LPSERVICE_MAIN_FUNCTION)qpid::broker::ServiceMain }, { NULL, NULL } }; if (!StartServiceCtrlDispatcher(dispatchTable)) { DWORD err = ::GetLastError(); if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console - return run_broker(argc, argv); + return qpid::broker::run_broker(argc, argv); throw QPID_WINDOWS_ERROR(err); } return 0; diff --git a/qpid/cpp/src/windows/SCM.cpp b/qpid/cpp/src/windows/SCM.cpp index 232bb04c17..2eeb143427 100644 --- a/qpid/cpp/src/windows/SCM.cpp +++ b/qpid/cpp/src/windows/SCM.cpp @@ -1,332 +1,332 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/log/Statement.h" -#include "qpid/sys/windows/check.h" -#include "SCM.h" - -#pragma comment(lib, "advapi32.lib") - -namespace { - -// Container that will close a SC_HANDLE upon destruction. -class AutoServiceHandle { -public: - AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {} - ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); } - void release() { h = NULL; } - void reset(SC_HANDLE newHandle) - { - if (h != NULL) - ::CloseServiceHandle(h); - h = newHandle; - } - operator SC_HANDLE() const { return h; } - -private: - SC_HANDLE h; -}; - -} - -namespace qpid { -namespace windows { - -SCM::SCM() : scmHandle(NULL) -{ -} - -SCM::~SCM() -{ - if (NULL != scmHandle) - ::CloseServiceHandle(scmHandle); -} - -/** - * Install this executable as a service - */ -void SCM::install(const string& serviceName, - const string& serviceDesc, - const string& args, - DWORD startType, - const string& account, - const string& password, - const string& depends) -{ - // Handle dependent service name list; Windows wants a set of nul-separated - // names ending with a double nul. - string depends2 = depends; - if (!depends2.empty()) { - // CDL to null delimiter w/ trailing double null - size_t p = 0; - while ((p = depends2.find_first_of( ',', p)) != string::npos) - depends2.replace(p, 1, 1, '\0'); - depends2.push_back('\0'); - depends2.push_back('\0'); - } - -#if 0 - // I'm nervous about adding a user/password check here. Is this a - // potential attack vector, letting users check passwords without - // control? -Steve Huston, Feb 24, 2011 - - // Validate account, password - HANDLE hToken = NULL; - bool logStatus = false; - if (!account.empty() && !password.empty() && - !(logStatus = ::LogonUserA(account.c_str(), - "", - password.c_str(), - LOGON32_LOGON_NETWORK, - LOGON32_PROVIDER_DEFAULT, - &hToken ) != 0)) - std::cout << "warning: supplied account & password failed with LogonUser." << std::endl; - if (logStatus) - ::CloseHandle(hToken); -#endif - - // Get fully qualified .exe name - char myPath[MAX_PATH]; - DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH); - QPID_WINDOWS_CHECK_NOT(myPathLength, 0); - string imagePath(myPath, myPathLength); - if (!args.empty()) - imagePath += " " + args; - - // Ensure there's a handle to the SCM database. - openSvcManager(); - - // Create the service - SC_HANDLE svcHandle; - svcHandle = ::CreateService(scmHandle, // SCM database - serviceName.c_str(), // name of service - serviceDesc.c_str(), // name to display - SERVICE_ALL_ACCESS, // desired access - SERVICE_WIN32_OWN_PROCESS, // service type - startType, // start type - SERVICE_ERROR_NORMAL, // error cntrl type - imagePath.c_str(), // path to service's binary w/ optional arguments - NULL, // no load ordering group - NULL, // no tag identifier - depends2.empty() ? NULL : depends2.c_str(), - account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem - password.empty() ? NULL : password.c_str()); // password, or NULL for none - QPID_WINDOWS_CHECK_NULL(svcHandle); - ::CloseServiceHandle(svcHandle); - QPID_LOG(info, "Service installed successfully"); -} - -/** - * - */ -void SCM::uninstall(const string& serviceName) -{ - // Ensure there's a handle to the SCM database. - openSvcManager(); - AutoServiceHandle svc(::OpenService(scmHandle, - serviceName.c_str(), - DELETE)); - QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc); - QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0); - QPID_LOG(info, "Service deleted successfully."); -} - -/** - * Attempt to start the service. - */ -void SCM::start(const string& serviceName) -{ - // Ensure we have a handle to the SCM database. - openSvcManager(); - - // Get a handle to the service. - AutoServiceHandle svc(::OpenService(scmHandle, - serviceName.c_str(), - SERVICE_ALL_ACCESS)); - QPID_WINDOWS_CHECK_NULL(svc); - - // Check the status in case the service is not stopped. - DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); - if (state == SERVICE_STOP_PENDING) - throw qpid::Exception("Timed out waiting for running service to stop."); - - // Attempt to start the service. - QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0); - - QPID_LOG(info, "Service start pending..."); - - // Check the status until the service is no longer start pending. - state = waitForStateChangeFrom(svc, SERVICE_START_PENDING); - // Determine whether the service is running. - if (state == SERVICE_RUNNING) { - QPID_LOG(info, "Service started successfully"); - } - else { - throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state)); - } -} - -/** - * - */ -void SCM::stop(const string& serviceName) -{ - // Ensure a handle to the SCM database. - openSvcManager(); - - // Get a handle to the service. - AutoServiceHandle svc(::OpenService(scmHandle, - serviceName.c_str(), - SERVICE_STOP | SERVICE_QUERY_STATUS | - SERVICE_ENUMERATE_DEPENDENTS)); - QPID_WINDOWS_CHECK_NULL(svc); - - // Make sure the service is not already stopped; if it's stop-pending, - // wait for it to finalize. - DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); - if (state == SERVICE_STOPPED) { - QPID_LOG(info, "Service is already stopped"); - return; - } - - // If the service is running, dependencies must be stopped first. - std::auto_ptr deps; - DWORD numDeps = getDependentServices(svc, deps); - for (DWORD i = 0; i < numDeps; i++) - stop(deps.get()[i].lpServiceName); - - // Dependents stopped; send a stop code to the service. - SERVICE_STATUS_PROCESS ssp; - if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp)) - throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " << - qpid::sys::strError(::GetLastError()))); - - // Wait for the service to stop. - state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); - if (state == SERVICE_STOPPED) - QPID_LOG(info, QPID_MSG("Service " << serviceName << - " stopped successfully.")); -} - -/** - * - */ -void SCM::openSvcManager() -{ - if (NULL != scmHandle) - return; - - scmHandle = ::OpenSCManager(NULL, // local computer - NULL, // ServicesActive database - SC_MANAGER_ALL_ACCESS); // Rights - QPID_WINDOWS_CHECK_NULL(scmHandle); -} - -DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState) -{ - SERVICE_STATUS_PROCESS ssStatus; - DWORD bytesNeeded; - DWORD waitTime; - if (!::QueryServiceStatusEx(svc, // handle to service - SC_STATUS_PROCESS_INFO, // information level - (LPBYTE)&ssStatus, // address of structure - sizeof(ssStatus), // size of structure - &bytesNeeded)) // size needed if buffer is too small - throw QPID_WINDOWS_ERROR(::GetLastError()); - - // Save the tick count and initial checkpoint. - DWORD startTickCount = ::GetTickCount(); - DWORD oldCheckPoint = ssStatus.dwCheckPoint; - - // Wait for the service to change out of the noted state. - while (ssStatus.dwCurrentState == originalState) { - // Do not wait longer than the wait hint. A good interval is - // one-tenth of the wait hint but not less than 1 second - // and not more than 10 seconds. - waitTime = ssStatus.dwWaitHint / 10; - if (waitTime < 1000) - waitTime = 1000; - else if (waitTime > 10000) - waitTime = 10000; - - ::Sleep(waitTime); - - // Check the status until the service is no longer stop pending. - if (!::QueryServiceStatusEx(svc, - SC_STATUS_PROCESS_INFO, - (LPBYTE) &ssStatus, - sizeof(ssStatus), - &bytesNeeded)) - throw QPID_WINDOWS_ERROR(::GetLastError()); - - if (ssStatus.dwCheckPoint > oldCheckPoint) { - // Continue to wait and check. - startTickCount = ::GetTickCount(); - oldCheckPoint = ssStatus.dwCheckPoint; - } else { - if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint) - break; - } - } - return ssStatus.dwCurrentState; -} - -/** - * Get the services that depend on @arg svc. All dependent service info - * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps. - * - * @retval The number of dependent services. - */ -DWORD SCM::getDependentServices(SC_HANDLE svc, - std::auto_ptr& deps) -{ - DWORD bytesNeeded; - DWORD numEntries; - - // Pass a zero-length buffer to get the required buffer size. - if (::EnumDependentServices(svc, - SERVICE_ACTIVE, - 0, - 0, - &bytesNeeded, - &numEntries)) { - // If the Enum call succeeds, then there are no dependent - // services, so do nothing. - return 0; - } - - if (::GetLastError() != ERROR_MORE_DATA) - throw QPID_WINDOWS_ERROR((::GetLastError())); - - // Allocate a buffer for the dependencies. - deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded])); - // Enumerate the dependencies. - if (!::EnumDependentServices(svc, - SERVICE_ACTIVE, - deps.get(), - bytesNeeded, - &bytesNeeded, - &numEntries)) - throw QPID_WINDOWS_ERROR((::GetLastError())); - return numEntries; -} - -} } // namespace qpid::windows +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/sys/windows/check.h" +#include "SCM.h" + +#pragma comment(lib, "advapi32.lib") + +namespace qpid { +namespace windows { + +namespace { + +// Container that will close a SC_HANDLE upon destruction. +class AutoServiceHandle { +public: + AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {} + ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); } + void release() { h = NULL; } + void reset(SC_HANDLE newHandle) + { + if (h != NULL) + ::CloseServiceHandle(h); + h = newHandle; + } + operator SC_HANDLE() const { return h; } + +private: + SC_HANDLE h; +}; + +} + +SCM::SCM() : scmHandle(NULL) +{ +} + +SCM::~SCM() +{ + if (NULL != scmHandle) + ::CloseServiceHandle(scmHandle); +} + +/** + * Install this executable as a service + */ +void SCM::install(const string& serviceName, + const string& serviceDesc, + const string& args, + DWORD startType, + const string& account, + const string& password, + const string& depends) +{ + // Handle dependent service name list; Windows wants a set of nul-separated + // names ending with a double nul. + string depends2 = depends; + if (!depends2.empty()) { + // CDL to null delimiter w/ trailing double null + size_t p = 0; + while ((p = depends2.find_first_of( ',', p)) != string::npos) + depends2.replace(p, 1, 1, '\0'); + depends2.push_back('\0'); + depends2.push_back('\0'); + } + +#if 0 + // I'm nervous about adding a user/password check here. Is this a + // potential attack vector, letting users check passwords without + // control? -Steve Huston, Feb 24, 2011 + + // Validate account, password + HANDLE hToken = NULL; + bool logStatus = false; + if (!account.empty() && !password.empty() && + !(logStatus = ::LogonUserA(account.c_str(), + "", + password.c_str(), + LOGON32_LOGON_NETWORK, + LOGON32_PROVIDER_DEFAULT, + &hToken ) != 0)) + std::cout << "warning: supplied account & password failed with LogonUser." << std::endl; + if (logStatus) + ::CloseHandle(hToken); +#endif + + // Get fully qualified .exe name + char myPath[MAX_PATH]; + DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH); + QPID_WINDOWS_CHECK_NOT(myPathLength, 0); + string imagePath(myPath, myPathLength); + if (!args.empty()) + imagePath += " " + args; + + // Ensure there's a handle to the SCM database. + openSvcManager(); + + // Create the service + SC_HANDLE svcHandle; + svcHandle = ::CreateService(scmHandle, // SCM database + serviceName.c_str(), // name of service + serviceDesc.c_str(), // name to display + SERVICE_ALL_ACCESS, // desired access + SERVICE_WIN32_OWN_PROCESS, // service type + startType, // start type + SERVICE_ERROR_NORMAL, // error cntrl type + imagePath.c_str(), // path to service's binary w/ optional arguments + NULL, // no load ordering group + NULL, // no tag identifier + depends2.empty() ? NULL : depends2.c_str(), + account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem + password.empty() ? NULL : password.c_str()); // password, or NULL for none + QPID_WINDOWS_CHECK_NULL(svcHandle); + ::CloseServiceHandle(svcHandle); + QPID_LOG(info, "Service installed successfully"); +} + +/** + * + */ +void SCM::uninstall(const string& serviceName) +{ + // Ensure there's a handle to the SCM database. + openSvcManager(); + AutoServiceHandle svc(::OpenService(scmHandle, + serviceName.c_str(), + DELETE)); + QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc); + QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0); + QPID_LOG(info, "Service deleted successfully."); +} + +/** + * Attempt to start the service. + */ +void SCM::start(const string& serviceName) +{ + // Ensure we have a handle to the SCM database. + openSvcManager(); + + // Get a handle to the service. + AutoServiceHandle svc(::OpenService(scmHandle, + serviceName.c_str(), + SERVICE_ALL_ACCESS)); + QPID_WINDOWS_CHECK_NULL(svc); + + // Check the status in case the service is not stopped. + DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); + if (state == SERVICE_STOP_PENDING) + throw qpid::Exception("Timed out waiting for running service to stop."); + + // Attempt to start the service. + QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0); + + QPID_LOG(info, "Service start pending..."); + + // Check the status until the service is no longer start pending. + state = waitForStateChangeFrom(svc, SERVICE_START_PENDING); + // Determine whether the service is running. + if (state == SERVICE_RUNNING) { + QPID_LOG(info, "Service started successfully"); + } + else { + throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state)); + } +} + +/** + * + */ +void SCM::stop(const string& serviceName) +{ + // Ensure a handle to the SCM database. + openSvcManager(); + + // Get a handle to the service. + AutoServiceHandle svc(::OpenService(scmHandle, + serviceName.c_str(), + SERVICE_STOP | SERVICE_QUERY_STATUS | + SERVICE_ENUMERATE_DEPENDENTS)); + QPID_WINDOWS_CHECK_NULL(svc); + + // Make sure the service is not already stopped; if it's stop-pending, + // wait for it to finalize. + DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); + if (state == SERVICE_STOPPED) { + QPID_LOG(info, "Service is already stopped"); + return; + } + + // If the service is running, dependencies must be stopped first. + std::auto_ptr deps; + DWORD numDeps = getDependentServices(svc, deps); + for (DWORD i = 0; i < numDeps; i++) + stop(deps.get()[i].lpServiceName); + + // Dependents stopped; send a stop code to the service. + SERVICE_STATUS_PROCESS ssp; + if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp)) + throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " << + qpid::sys::strError(::GetLastError()))); + + // Wait for the service to stop. + state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); + if (state == SERVICE_STOPPED) + QPID_LOG(info, QPID_MSG("Service " << serviceName << + " stopped successfully.")); +} + +/** + * + */ +void SCM::openSvcManager() +{ + if (NULL != scmHandle) + return; + + scmHandle = ::OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // Rights + QPID_WINDOWS_CHECK_NULL(scmHandle); +} + +DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState) +{ + SERVICE_STATUS_PROCESS ssStatus; + DWORD bytesNeeded; + DWORD waitTime; + if (!::QueryServiceStatusEx(svc, // handle to service + SC_STATUS_PROCESS_INFO, // information level + (LPBYTE)&ssStatus, // address of structure + sizeof(ssStatus), // size of structure + &bytesNeeded)) // size needed if buffer is too small + throw QPID_WINDOWS_ERROR(::GetLastError()); + + // Save the tick count and initial checkpoint. + DWORD startTickCount = ::GetTickCount(); + DWORD oldCheckPoint = ssStatus.dwCheckPoint; + + // Wait for the service to change out of the noted state. + while (ssStatus.dwCurrentState == originalState) { + // Do not wait longer than the wait hint. A good interval is + // one-tenth of the wait hint but not less than 1 second + // and not more than 10 seconds. + waitTime = ssStatus.dwWaitHint / 10; + if (waitTime < 1000) + waitTime = 1000; + else if (waitTime > 10000) + waitTime = 10000; + + ::Sleep(waitTime); + + // Check the status until the service is no longer stop pending. + if (!::QueryServiceStatusEx(svc, + SC_STATUS_PROCESS_INFO, + (LPBYTE) &ssStatus, + sizeof(ssStatus), + &bytesNeeded)) + throw QPID_WINDOWS_ERROR(::GetLastError()); + + if (ssStatus.dwCheckPoint > oldCheckPoint) { + // Continue to wait and check. + startTickCount = ::GetTickCount(); + oldCheckPoint = ssStatus.dwCheckPoint; + } else { + if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint) + break; + } + } + return ssStatus.dwCurrentState; +} + +/** + * Get the services that depend on @arg svc. All dependent service info + * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps. + * + * @retval The number of dependent services. + */ +DWORD SCM::getDependentServices(SC_HANDLE svc, + std::auto_ptr& deps) +{ + DWORD bytesNeeded; + DWORD numEntries; + + // Pass a zero-length buffer to get the required buffer size. + if (::EnumDependentServices(svc, + SERVICE_ACTIVE, + 0, + 0, + &bytesNeeded, + &numEntries)) { + // If the Enum call succeeds, then there are no dependent + // services, so do nothing. + return 0; + } + + if (::GetLastError() != ERROR_MORE_DATA) + throw QPID_WINDOWS_ERROR((::GetLastError())); + + // Allocate a buffer for the dependencies. + deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded])); + // Enumerate the dependencies. + if (!::EnumDependentServices(svc, + SERVICE_ACTIVE, + deps.get(), + bytesNeeded, + &bytesNeeded, + &numEntries)) + throw QPID_WINDOWS_ERROR((::GetLastError())); + return numEntries; +} + +} } // namespace qpid::windows diff --git a/qpid/java/build.deps b/qpid/java/build.deps index fe0ca6362b..ec9eacb169 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -35,6 +35,7 @@ geronimo-kernel=lib/geronimo-kernel-2.2.1.jar geronimo-openejb=lib/geronimo-ejb_3.0_spec-1.0.1.jar junit=lib/junit-3.8.1.jar +mockito-all=lib/mockito-all-1.9.0.jar log4j=lib/log4j-1.2.12.jar @@ -58,7 +59,7 @@ broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \ broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs} -junit-toolkit.libs=${log4j} ${junit} ${slf4j-api} +junit-toolkit.libs=${log4j} ${junit} ${slf4j-api} ${mockito-all} test.libs=${slf4j-log4j} ${junit-toolkit.libs} ibm-icu=lib/com.ibm.icu_3.8.1.v20080530.jar diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 56ee56d178..a1a06c5547 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -327,6 +327,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + _conn.setClosed(); + ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { diff --git a/qpid/java/lib/mockito-all-1.9.0.jar b/qpid/java/lib/mockito-all-1.9.0.jar new file mode 100644 index 0000000000..273fd50feb Binary files /dev/null and b/qpid/java/lib/mockito-all-1.9.0.jar differ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java index 6b83929258..5b3bca7033 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.management.jmx.ManagedConnectionMBeanTest; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.ConnectionException; @@ -62,10 +63,13 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase { final Class expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class; + assertConnectionOpen(); + stopBroker(); JMSException exception = _recordingExceptionListener.awaitException(10000); assertConnectionCloseWasReported(exception, expectedLinkedException); + assertConnectionClosed(); ensureCanCloseWithoutException(); } @@ -79,10 +83,13 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase return; } + assertConnectionOpen(); + killBroker(); JMSException exception = _recordingExceptionListener.awaitException(10000); assertConnectionCloseWasReported(exception, expectedLinkedException); + assertConnectionClosed(); ensureCanCloseWithoutException(); } @@ -107,6 +114,16 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass()); } + private void assertConnectionClosed() + { + assertTrue("Connection should be marked as closed", ((AMQConnection)_connection).isClosed()); + } + + private void assertConnectionOpen() + { + assertFalse("Connection should not be marked as closed", ((AMQConnection)_connection).isClosed()); + } + private final class RecordingExceptionListener implements ExceptionListener { private final CountDownLatch _exceptionReceivedLatch = new CountDownLatch(1); diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index 18fb0a61a3..ace7611a2f 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base): snd.close() + def test_ttl_expire(self): + """ Verify that expired (TTL) group messages are skipped correctly + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") + + groups = ["A","B","C","A","B","C"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + if m.properties['THE-GROUP'] == 'B': + m.ttl = 1; + snd.send(m) + + sleep(2) # let all B's expire + + # create consumers on separate sessions: C1,C2 + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":0}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":0}) + + # C1 should acquire A-0, then C2 should acquire C-2, Group B should + # expire and never be fetched + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 2 + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 3 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 5 + + # there should be no more left for either consumer + try: + mx = c1.fetch(0) + assert False # should never get here + except Empty: + pass + try: + mx = c2.fetch(0) + assert False # should never get here + except Empty: + pass + + c1.session.acknowledge() + c2.session.acknowledge() + c1.close() + c2.close() + snd.close() + + class StickyConsumerMsgGroupTests(Base): """ Tests for the behavior of sticky-consumer message groups. These tests diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index 896ae89faf..1308df765d 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -481,7 +481,7 @@ class BrokerManager: if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: - print "--alternate-exchange=%s" % q._altExchange_.name, + print "--alternate-exchange=%s" % q.altExchange, if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], -- cgit v1.2.1