From 6eb1a4b5693bbbc75319e19bc4686aa21f2bc5ec Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 5 Oct 2009 10:47:52 +0000 Subject: Merged from trunk up to r800440 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821749 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 13 +- qpid/cpp/src/qpid/amqp_0_10/Connection.h | 5 + qpid/cpp/src/qpid/broker/Broker.h | 16 +- qpid/cpp/src/qpid/broker/Connection.cpp | 51 ++-- qpid/cpp/src/qpid/broker/Connection.h | 4 +- qpid/cpp/src/qpid/broker/DtxManager.cpp | 6 +- qpid/cpp/src/qpid/broker/DtxManager.h | 8 +- qpid/cpp/src/qpid/broker/DtxTimeout.h | 6 +- qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 10 +- qpid/cpp/src/qpid/broker/LinkRegistry.h | 6 +- qpid/cpp/src/qpid/broker/Queue.cpp | 28 +- qpid/cpp/src/qpid/broker/Queue.h | 3 + qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 8 +- qpid/cpp/src/qpid/broker/QueueCleaner.h | 12 +- qpid/cpp/src/qpid/broker/SessionState.cpp | 17 +- qpid/cpp/src/qpid/broker/SessionState.h | 7 +- qpid/cpp/src/qpid/client/Connector.cpp | 5 +- qpid/cpp/src/qpid/cluster/Cluster.cpp | 21 +- qpid/cpp/src/qpid/cluster/Cluster.h | 2 +- qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 2 +- qpid/cpp/src/qpid/cluster/ClusterMap.h | 9 +- qpid/cpp/src/qpid/cluster/Connection.cpp | 2 +- qpid/cpp/src/qpid/cluster/Connection.h | 2 +- qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp | 17 +- qpid/cpp/src/qpid/cluster/ConnectionCodec.h | 3 +- qpid/cpp/src/qpid/cluster/ErrorCheck.cpp | 80 ++++-- qpid/cpp/src/qpid/cluster/ErrorCheck.h | 9 +- qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp | 8 +- qpid/cpp/src/qpid/cluster/ExpiryPolicy.h | 9 +- qpid/cpp/src/qpid/framing/AMQFrame.h | 4 +- qpid/cpp/src/qpid/management/ManagementAgent.cpp | 6 +- qpid/cpp/src/qpid/management/ManagementAgent.h | 6 +- .../qpid/replication/ReplicatingEventListener.cpp | 1 + .../src/qpid/replication/ReplicationExchange.cpp | 32 ++- qpid/cpp/src/qpid/replication/constants.h | 1 + qpid/cpp/src/qpid/sys/Timer.cpp | 26 +- qpid/cpp/src/tests/Makefile.am | 2 +- qpid/cpp/src/tests/PartialFailure.cpp | 27 +- qpid/cpp/src/tests/QueueTest.cpp | 37 ++- qpid/cpp/src/tests/allSegmentTypes.h | 128 --------- qpid/cpp/src/tests/replication_test | 63 ++++- qpid/cpp/xml/cluster.xml | 4 +- .../templ.java/model/ProtocolVersionListClass.vm | 6 + .../java/org/apache/qpid/server/AMQChannel.java | 28 +- .../server/configuration/ServerConfiguration.java | 4 +- .../org/apache/qpid/server/logging/LogActor.java | 19 +- .../qpid/server/logging/RootMessageLogger.java | 8 + .../qpid/server/logging/RootMessageLoggerImpl.java | 5 + .../qpid/server/logging/actors/AbstractActor.java | 18 ++ .../logging/messages/LogMessages_en_US.properties | 6 +- .../logging/rawloggers/Log4jMessageLogger.java | 1 + .../logging/subjects/MessagesStoreLogSubject.java | 45 +++ .../server/protocol/AMQMinaProtocolSession.java | 96 ++++--- .../qpid/server/protocol/AMQProtocolSession.java | 4 + .../server/protocol/AMQProtocolSessionMBean.java | 42 ++- .../qpid/server/virtualhost/VirtualHost.java | 7 +- .../exchange/AbstractHeadersExchangeTestBase.java | 13 +- .../qpid/server/exchange/DestWildExchangeTest.java | 2 +- .../qpid/server/exchange/HeadersExchangeTest.java | 41 +-- .../logging/actors/AMQPChannelActorTest.java | 236 +++++++++++----- .../logging/actors/AMQPConnectionActorTest.java | 113 +++----- .../server/logging/actors/CurrentActorTest.java | 34 +-- .../logging/messages/ChannelMessagesTest.java | 7 +- .../logging/messages/ConnectionMessagesTest.java | 31 ++- .../logging/rawloggers/Log4jMessageLoggerTest.java | 66 +---- .../logging/subjects/AbstractTestLogSubject.java | 32 ++- .../logging/subjects/ChannelLogSubjectTest.java | 24 -- .../logging/subjects/ConnectionLogSubjectTest.java | 28 +- .../subjects/MessageStoreLogSubjectTest.java | 59 ++++ .../subjects/SubscriptionLogSubjectTest.java | 32 +-- .../protocol/AMQProtocolSessionMBeanTest.java | 3 +- .../protocol/InternalTestProtocolSession.java | 16 +- .../qpid/server/protocol/MaxChannelsTest.java | 11 +- .../qpid/server/queue/AMQQueueAlertTest.java | 8 +- .../qpid/server/queue/AMQQueueMBeanTest.java | 5 +- .../java/org/apache/qpid/server/queue/AckTest.java | 11 +- .../qpid/server/queue/MockProtocolSession.java | 286 ------------------- .../server/security/access/ACLManagerTest.java | 15 +- .../apache/qpid/server/store/MessageStoreTest.java | 2 +- .../qpid/server/util/InternalBrokerBaseCase.java | 15 +- .../qpid/server/util/TestApplicationRegistry.java | 5 + .../qpid/server/logging/messages/LogMessages.vm | 28 +- .../templates/model/ProtocolVersionListClass.vm | 7 + .../java/org/apache/qpid/server/AlertingTest.java | 182 ------------- .../qpid/server/logging/AbstractTestLogging.java | 262 ++++++++++++++++++ .../apache/qpid/server/logging/AlertingTest.java | 202 ++++++++++++++ .../qpid/server/logging/ChannelLoggingTest.java | 281 +++++++++++++++++++ .../qpid/server/logging/ConnectionLoggingTest.java | 185 +++++++++++++ .../org/apache/qpid/test/utils/QpidTestCase.java | 150 +++++++++- .../main/java/org/apache/qpid/util/LogMonitor.java | 176 ++++++++++++ .../java/org/apache/qpid/util/LogMonitorTest.java | 302 +++++++++++++++++++++ qpid/java/test-profiles/010Excludes | 2 + 92 files changed, 2632 insertions(+), 1235 deletions(-) delete mode 100644 qpid/cpp/src/tests/allSegmentTypes.h create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index c1de5e2dec..96d5146c30 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -31,7 +31,8 @@ namespace amqp_0_10 { using sys::Mutex; Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient) - : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0) + : frameQueueClosed(false), output(o), identifier(id), initialized(false), + isClient(_isClient), buffered(0), version(0,10) {} void Connection::setInputHandler(std::auto_ptr c) { @@ -44,7 +45,9 @@ size_t Connection::decode(const char* buffer, size_t size) { //read in protocol header framing::ProtocolInitiation pi; if (pi.decode(in)) { - //TODO: check the version is correct + if(!(pi==version)) + throw Exception(QPID_MSG("Unsupported version: " << pi + << " supported version " << version)); QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")"); } initialized = true; @@ -128,7 +131,11 @@ void Connection::send(framing::AMQFrame& f) { } framing::ProtocolVersion Connection::getVersion() const { - return framing::ProtocolVersion(0,10); + return version; +} + +void Connection::setVersion(const framing::ProtocolVersion& v) { + version = v; } size_t Connection::getBuffered() const { diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index 32aadff105..6fd51381fc 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -55,6 +55,7 @@ class Connection : public sys::ConnectionCodec, bool initialized; bool isClient; size_t buffered; + framing::ProtocolVersion version; public: QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient); @@ -71,6 +72,10 @@ class Connection : public sys::ConnectionCodec, void send(framing::AMQFrame&); framing::ProtocolVersion getVersion() const; size_t getBuffered() const; + + /** Used by cluster code to set a special version on "update" connections. */ + // FIXME aconway 2009-07-30: find a cleaner mechanism for this. + void setVersion(const framing::ProtocolVersion&); }; }} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index ac8f835398..0517ceca95 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -36,7 +36,6 @@ #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Vhost.h" #include "qpid/broker/System.h" -#include "qpid/broker/Timer.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -49,6 +48,7 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" +#include "qpid/sys/Timer.h" #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" @@ -115,13 +115,14 @@ public: private: std::string getHome(); }; - + private: typedef std::map > ProtocolFactoryMap; void declareStandardExchange(const std::string& name, const std::string& type); boost::shared_ptr poller; + sys::Timer timer; Options config; ProtocolFactoryMap protocolFactories; std::auto_ptr store; @@ -132,7 +133,6 @@ public: ExchangeRegistry exchanges; LinkRegistry links; boost::shared_ptr factory; - Timer timer; DtxManager dtxManager; SessionManager sessionManager; management::ManagementAgent* managementAgent; @@ -148,8 +148,6 @@ public: boost::intrusive_ptr expiryPolicy; public: - - virtual ~Broker(); QPID_BROKER_EXTERN Broker(const Options& configuration); @@ -188,7 +186,7 @@ public: void setExpiryPolicy(const boost::intrusive_ptr& e) { expiryPolicy = e; } boost::intrusive_ptr getExpiryPolicy() { return expiryPolicy; } - + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } @@ -197,7 +195,7 @@ public: management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - + /** Add to the broker's protocolFactorys */ void registerProtocolFactory(const std::string& name, boost::shared_ptr); @@ -229,7 +227,7 @@ public: boost::shared_ptr getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr f) { factory = f; } - Timer& getTimer() { return timer; } + sys::Timer& getTimer() { return timer; } boost::function ()> getKnownBrokers; @@ -242,7 +240,5 @@ public: }; }} - - #endif /*!_Broker_*/ diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 91c95289ac..a1a3c6ada7 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -49,35 +49,25 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { -struct ConnectionTimeoutTask : public TimerTask { - Timer& timer; +struct ConnectionTimeoutTask : public sys::TimerTask { + sys::Timer& timer; Connection& connection; - AbsTime expires; - ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) : + ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) : TimerTask(Duration(hb*2*TIME_SEC)), timer(t), - connection(c), - expires(AbsTime::now(), duration) + connection(c) {} - void touch() - { - expires = AbsTime(AbsTime::now(), duration); + void touch() { + restart(); } void fire() { - // This is the best we can currently do to avoid a destruction/fire race - if (isCancelled()) return; - if (expires < AbsTime::now()) { - // If we get here then we've not received any traffic in the timeout period - // Schedule closing the connection for the io thread - QPID_LOG(error, "Connection timed out: closing"); - connection.abort(); - } else { - reset(); - timer.add(this); - } + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); } }; @@ -338,25 +328,22 @@ void Connection::setSecureConnection(SecureConnection* s) adapter.setSecureConnection(s); } -struct ConnectionHeartbeatTask : public TimerTask { - Timer& timer; +struct ConnectionHeartbeatTask : public sys::TimerTask { + sys::Timer& timer; Connection& connection; - ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) : TimerTask(Duration(hb*TIME_SEC)), timer(t), connection(c) {} void fire() { - // This is the best we can currently do to avoid a destruction/fire race - if (!isCancelled()) { - // Setup next firing - reset(); - timer.add(this); - - // Send Heartbeat - connection.sendHeartbeat(); - } + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); } }; diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 731d36d83b..42409969b9 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -152,8 +152,8 @@ class Connection : public sys::ConnectionInputHandler, qmf::org::apache::qpid::broker::Connection* mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; - Timer& timer; - boost::intrusive_ptr heartbeatTimer; + sys::Timer& timer; + boost::intrusive_ptr heartbeatTimer; boost::intrusive_ptr timeoutTimer; ErrorListener* errorListener; bool shadow; diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index 9eb077716d..a2ab20ec44 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/DtxTimeout.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Timer.h" #include "qpid/ptr_map.h" #include @@ -33,7 +34,7 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(Timer& t) : store(0), timer(t) {} +DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {} DtxManager::~DtxManager() {} @@ -130,8 +131,7 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs) } timeout = intrusive_ptr(new DtxTimeout(secs, *this, xid)); record->setTimeout(timeout); - timer.add(boost::static_pointer_cast(timeout)); - + timer.add(timeout); } uint32_t DtxManager::getTimeout(const std::string& xid) diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index 4b4fde3b39..680b62eeb2 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -24,9 +24,9 @@ #include #include "qpid/broker/DtxBuffer.h" #include "qpid/broker/DtxWorkRecord.h" -#include "qpid/broker/Timer.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/Mutex.h" namespace qpid { @@ -35,7 +35,7 @@ namespace broker { class DtxManager{ typedef boost::ptr_map WorkMap; - struct DtxCleanup : public TimerTask + struct DtxCleanup : public sys::TimerTask { DtxManager& mgr; const std::string& xid; @@ -47,14 +47,14 @@ class DtxManager{ WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; - Timer& timer; + qpid::sys::Timer& timer; void remove(const std::string& xid); DtxWorkRecord* getWork(const std::string& xid); DtxWorkRecord* createWork(std::string xid); public: - DtxManager(Timer&); + DtxManager(qpid::sys::Timer&); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.h b/qpid/cpp/src/qpid/broker/DtxTimeout.h index 3c468b3373..680a210e4f 100644 --- a/qpid/cpp/src/qpid/broker/DtxTimeout.h +++ b/qpid/cpp/src/qpid/broker/DtxTimeout.h @@ -22,7 +22,7 @@ #define _DtxTimeout_ #include "qpid/Exception.h" -#include "qpid/broker/Timer.h" +#include "qpid/sys/Timer.h" namespace qpid { namespace broker { @@ -31,12 +31,12 @@ class DtxManager; struct DtxTimeoutException : public Exception {}; -struct DtxTimeout : public TimerTask +struct DtxTimeout : public sys::TimerTask { const uint32_t timeout; DtxManager& mgr; const std::string xid; - + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 3672f71515..c70392eb23 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -41,18 +41,18 @@ namespace _qmf = qmf::org::apache::qpid::broker; // factored: The persistence element and maintenance element // should be factored separately LinkRegistry::LinkRegistry () : - broker(0), + broker(0), timer(0), parent(0), store(0), passive(false), passiveChanged(false), realm("") { } LinkRegistry::LinkRegistry (Broker* _broker) : - broker(_broker), - parent(0), store(0), passive(false), passiveChanged(false), + broker(_broker), timer(&broker->getTimer()), + parent(0), store(0), passive(false), passiveChanged(false), realm(broker->getOptions().realm) { - timer.add (intrusive_ptr (new Periodic(*this))); + timer->add (new Periodic(*this)); } LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : @@ -61,7 +61,7 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : void LinkRegistry::Periodic::fire () { links.periodicMaintenance (); - links.timer.add (intrusive_ptr (new Periodic(links))); + links.timer->add (new Periodic(links)); } void LinkRegistry::periodicMaintenance () diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 8d1a252f54..d1a4201c82 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -25,9 +25,9 @@ #include #include "qpid/broker/Bridge.h" #include "qpid/broker/MessageStore.h" -#include "qpid/broker/Timer.h" #include "qpid/Address.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include @@ -41,7 +41,7 @@ namespace broker { // Declare a timer task to manage the establishment of link connections and the // re-establishment of lost link connections. - struct Periodic : public TimerTask + struct Periodic : public sys::TimerTask { LinkRegistry& links; @@ -62,7 +62,7 @@ namespace broker { qpid::sys::Mutex lock; Broker* broker; - Timer timer; + sys::Timer* timer; management::Manageable* parent; MessageStore* store; bool passive; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index ee5831fed0..08ee133981 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -677,21 +677,25 @@ void Queue::setLastNodeFailure() { if (persistLastNode){ Mutex::ScopedLock locker(messageLock); - for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { - if (lastValueQueue) checkLvqReplace(*i); - // don't force a message twice to disk. - if(!i->payload->isStoredOnQueue(shared_from_this())) { - i->payload->forcePersistent(); - if (i->payload->isForcedPersistent() ){ - enqueue(0, i->payload); + try { + for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { + if (lastValueQueue) checkLvqReplace(*i); + // don't force a message twice to disk. + if(!i->payload->isStoredOnQueue(shared_from_this())) { + i->payload->forcePersistent(); + if (i->payload->isForcedPersistent() ){ + enqueue(0, i->payload); + } } - } - } + } + } catch (const std::exception& e) { + // Could not go into last node standing (for example journal not large enough) + QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what()); + } inLastNodeFailure = true; } } - // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr msg) { @@ -1012,6 +1016,10 @@ void Queue::setPosition(SequenceNumber n) { sequence = n; } +SequenceNumber Queue::getPosition() { + return sequence; +} + int Queue::getEventMode() { return eventMode; } void Queue::setQueueEventManager(QueueEvents& mgr) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 475766ae30..6703d06bbb 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -324,6 +324,9 @@ namespace qpid { * Used by cluster to replicate queues. */ void setPosition(framing::SequenceNumber pos); + /** return current position sequence number for the next message on the queue. + */ + framing::SequenceNumber getPosition(); int getEventMode(); void setQueueEventManager(QueueEvents&); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index 7a75ed013b..83f3f83520 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -26,15 +26,15 @@ namespace qpid { namespace broker { -QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {} void QueueCleaner::start(qpid::sys::Duration p) { - task = boost::intrusive_ptr(new Task(*this, p)); + task = new Task(*this, p); timer.add(task); } -QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {} +QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {} void QueueCleaner::Task::fire() { @@ -44,7 +44,7 @@ void QueueCleaner::Task::fire() void QueueCleaner::fired() { queues.eachQueue(boost::bind(&Queue::purgeExpired, _1)); - task->reset(); + task->setupNextFire(); timer.add(task); } diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h index 1ae72c19f5..c351cadd8a 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.h +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h @@ -23,7 +23,7 @@ */ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/Timer.h" +#include "qpid/sys/Timer.h" namespace qpid { namespace broker { @@ -35,10 +35,10 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer); QPID_BROKER_EXTERN void start(qpid::sys::Duration period); private: - class Task : public TimerTask + class Task : public sys::TimerTask { public: Task(QueueCleaner& parent, qpid::sys::Duration duration); @@ -46,10 +46,10 @@ class QueueCleaner private: QueueCleaner& parent; }; - - boost::intrusive_ptr task; + + boost::intrusive_ptr task; QueueRegistry& queues; - Timer& timer; + sys::Timer& timer; void fired(); }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 1c8e3ffebc..d4e5cfaa67 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -25,7 +25,7 @@ #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/RateFlowcontrol.h" -#include "qpid/broker/Timer.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" @@ -49,6 +49,7 @@ using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::AbsTime; +//using qpid::sys::Timer; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( @@ -206,10 +207,10 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } } -struct ScheduledCreditTask : public TimerTask { - Timer& timer; +struct ScheduledCreditTask : public sys::TimerTask { + sys::Timer& timer; SessionState& sessionState; - ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, + ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t, SessionState& s) : TimerTask(d), timer(t), @@ -218,15 +219,13 @@ struct ScheduledCreditTask : public TimerTask { void fire() { // This is the best we can currently do to avoid a destruction/fire race - if (!isCancelled()) { - sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); - } + sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); } void sendCredit() { if ( !sessionState.processSendCredit(0) ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); - reset(); + setupNextFire(); timer.add(this); } } @@ -269,7 +268,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) if (rateFlowcontrol && frame.getBof() && frame.getBos()) { if ( !processSendCredit(1) ) { QPID_LOG(debug, getId() << ": Schedule sending credit"); - Timer& timer = getBroker().getTimer(); + sys::Timer& timer = getBroker().getTimer(); // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); flowControlTimer = new ScheduledCreditTask(d, timer, *this); diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index d66c449b06..67fd4f4f38 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -48,6 +48,10 @@ namespace framing { class AMQP_ClientProxy; } +namespace sys { +class TimerTask; +} + namespace broker { class Broker; @@ -56,7 +60,6 @@ class Message; class SessionHandler; class SessionManager; class RateFlowcontrol; -struct TimerTask; /** * Broker-side session state includes session's handler chains, which @@ -153,7 +156,7 @@ class SessionState : public qpid::SessionState, // State used for producer flow control (rate limited) qpid::sys::Mutex rateLock; boost::scoped_ptr rateFlowcontrol; - boost::intrusive_ptr flowControlTimer; + boost::intrusive_ptr flowControlTimer; friend class SessionManager; }; diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index 53e8bc5239..f69032b26d 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -360,8 +360,11 @@ size_t TCPConnector::decode(const char* buffer, size_t size) if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { - //TODO: check the version is correct QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); + if(!(protocolInit==version)){ + throw Exception(QPID_MSG("Unsupported version: " << protocolInit + << " supported version " << version)); + } } initiated = true; } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index c5ac2ecfdc..ca3a7fa257 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -66,7 +66,7 @@ * * Events are either * - Connection events: non-0 connection number and are associated with a connection. - * - Cluster Events: 0 connection number, are not associated with a connectin. + * - Cluster Events: 0 connection number, are not associated with a connection. * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. @@ -149,7 +149,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * sensible reporting of an attempt to mix different versions in a * cluster. */ -const uint32_t Cluster::CLUSTER_VERSION = 1; +const uint32_t Cluster::CLUSTER_VERSION = 2; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -163,7 +163,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } - void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } + void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } void shutdown() { cluster.shutdown(member, l); } @@ -869,15 +869,12 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } -void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) { - // If we handle an errorCheck at this point (rather than in the - // ErrorCheck class) then we have processed succesfully past the - // point of the error. - if (state >= CATCHUP && type != ERROR_TYPE_NONE) { - QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally."); - mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self); - } +void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { + // If we see an errorCheck here (rather than in the ErrorCheck + // class) then we have processed succesfully past the point of the + // error. + if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened + error.respondNone(from, type, frameSeq); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 87280f682b..a95f2ab60e 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -152,7 +152,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); - void errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&); + void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void shutdown(const MemberId&, Lock&); diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index e4e0e50fe3..ca5237d6b1 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -71,7 +71,7 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : fra joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_) +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_) : frameSeq(frameSeq_) { std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 1b891f73f0..5735a6335d 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -25,6 +25,7 @@ #include "qpid/cluster/types.h" #include "qpid/Url.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" +#include "qpid/framing/SequenceNumber.h" #include #include @@ -53,7 +54,7 @@ class ClusterMap { ClusterMap(); ClusterMap(const MemberId& id, const Url& url, bool isReady); - ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq); + ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq); /** Update from config change. *@return true if member set changed. @@ -92,8 +93,8 @@ class ClusterMap { */ static Set intersection(const Set& a, const Set& b); - uint64_t getFrameSeq() { return frameSeq; } - uint64_t incrementFrameSeq() { return ++frameSeq; } + framing::SequenceNumber getFrameSeq() { return frameSeq; } + framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; } /** Clear out all knowledge of joiners & members, just keep alive set */ void clearStatus() { joiners.clear(); members.clear(); } @@ -103,7 +104,7 @@ class ClusterMap { Map joiners, members; Set alive; - uint64_t frameSeq; + framing::SequenceNumber frameSeq; friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 03a06e1c09..4cc977d14a 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -311,7 +311,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str output.setSendMax(sendMax); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); consumerNumbering.clear(); diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index e15c23ccf2..2799cc9fe1 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -122,7 +122,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); + void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq); void retractOffer(); diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index 0c791cdf44..4ff8b0a4a3 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -38,24 +38,27 @@ using namespace framing; sys::ConnectionCodec* ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { if (v == ProtocolVersion(0, 10)) - return new ConnectionCodec(out, id, cluster, false, false); - else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) - return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection + return new ConnectionCodec(v, out, id, cluster, false, false); + else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection + return new ConnectionCodec(v, out, id, cluster, true, false); return 0; } // Used for outgoing Link connections sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) { - return new ConnectionCodec(out, logId, cluster, false, true); + return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink) - : codec(out, logId, isLink), - interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink)) +ConnectionCodec::ConnectionCodec( + const ProtocolVersion& v, sys::OutputControl& out, + const std::string& logId, Cluster& cluster, bool catchUp, bool isLink +) : codec(out, logId, isLink), + interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink)) { std::auto_ptr ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); + codec.setVersion(v); } ConnectionCodec::~ConnectionCodec() {} diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h index ea01b7abb9..4ff738b603 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h @@ -56,7 +56,8 @@ class ConnectionCodec : public sys::ConnectionCodec { sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink); + ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out, + const std::string& logId, Cluster& c, bool catchUp, bool isLink); ~ConnectionCodec(); // ConnectionCodec functions. diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp index 0a16d492e4..35be055d06 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -45,11 +45,11 @@ ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) { } void ErrorCheck::error( - Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg) + Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg) { // Detected a local error, inform cluster and set error state. assert(t != ERROR_TYPE_NONE); // Must be an error. - assert(type == ERROR_TYPE_NONE); // Can only be called while processing + assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state. type = t; unresolved = ms; frameSeq = seq; @@ -59,7 +59,7 @@ void ErrorCheck::error( << " error " << frameSeq << " on " << c << ": " << msg << " must be resolved with: " << unresolved); mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember()); + ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); // If there are already frames queued up by a previous error, review // them with respect to this new error. for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i)) @@ -74,41 +74,52 @@ void ErrorCheck::delivered(const EventFrame& e) { // Review a frame in the queue with respect to the current error. ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) { FrameQueue::iterator next = i+1; - if (isUnresolved()) { - const ClusterErrorCheckBody* errorCheck = 0; - if (i->frame.getBody()) - errorCheck = dynamic_cast( - i->frame.getMethod()); - if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod()) + return next; // Only interested in control frames while unresolved. + const AMQMethodBody* method = i->frame.getMethod(); + if (method->isA()) { + const ClusterErrorCheckBody* errorCheck = + static_cast(method); + + if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error next = frames.erase(i); // Drop matching error check controls if (errorCheck->getType() < type) { // my error is worse than his QPID_LOG(critical, cluster << " error " << frameSeq << " did not occur on " << i->getMemberId()); - throw Exception("Aborted by failure that did not occur on all replicas"); + throw Exception(QPID_MSG("Error " << frameSeq + << " did not occur on all members")); } else { // his error is worse/same as mine. - QPID_LOG(notice, cluster << " error " << frameSeq + QPID_LOG(info, cluster << " error " << frameSeq << " resolved with " << i->getMemberId()); unresolved.erase(i->getMemberId()); checkResolved(); } } - else { - const ClusterConfigChangeBody* configChange = 0; - if (i->frame.getBody()) - configChange = dynamic_cast( - i->frame.getMethod()); - if (configChange) { - MemberSet members(ClusterMap::decode(configChange->getCurrent())); - QPID_LOG(debug, cluster << " apply config change to unresolved: " - << members); - MemberSet intersect; - set_intersection(members.begin(), members.end(), - unresolved.begin(), unresolved.end(), - inserter(intersect, intersect.begin())); - unresolved.swap(intersect); - checkResolved(); - } + else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE + && i->connectionId.getMember() != cluster.getId()) + { + // This error occured before the current error so we + // have processed past it. + next = frames.erase(i); // Drop the error check control + respondNone(i->connectionId.getMember(), errorCheck->getType(), + errorCheck->getFrameSeq()); + } + // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue. + } + else if (method->isA()) { + const ClusterConfigChangeBody* configChange = + static_cast(method); + if (configChange) { + MemberSet members(ClusterMap::decode(configChange->getCurrent())); + QPID_LOG(debug, cluster << " apply config change to error " + << frameSeq << ": " << members); + MemberSet intersect; + set_intersection(members.begin(), members.end(), + unresolved.begin(), unresolved.end(), + inserter(intersect, intersect.begin())); + unresolved.swap(intersect); + checkResolved(); } } return next; @@ -117,10 +128,10 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& void ErrorCheck::checkResolved() { if (unresolved.empty()) { // No more potentially conflicted members, we're clear. type = ERROR_TYPE_NONE; - QPID_LOG(notice, cluster << " error " << frameSeq << " resolved."); + QPID_LOG(info, cluster << " error " << frameSeq << " resolved."); } else - QPID_LOG(notice, cluster << " error " << frameSeq + QPID_LOG(info, cluster << " error " << frameSeq << " must be resolved with " << unresolved); } @@ -131,4 +142,15 @@ EventFrame ErrorCheck::getNext() { return e; } +void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) { + // Don't respond to non-errors or to my own errors. + if (type == ERROR_TYPE_NONE || from == cluster.getId()) + return; + QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally."); + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), + cluster.getId() + ); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h index 550cd36b75..09028391ac 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.h +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -25,6 +25,7 @@ #include "qpid/cluster/types.h" #include "qpid/cluster/Multicaster.h" #include "qpid/framing/enum.h" +#include "qpid/framing/SequenceNumber.h" #include #include #include @@ -49,11 +50,12 @@ class ErrorCheck public: typedef std::set MemberSet; typedef framing::cluster::ErrorType ErrorType; + typedef framing::SequenceNumber SequenceNumber; ErrorCheck(Cluster&); /** A local error has occured */ - void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&, + void error(Connection&, ErrorType, SequenceNumber frameSeq, const MemberSet&, const std::string& msg); /** Called when a frame is delivered */ @@ -66,7 +68,8 @@ class ErrorCheck bool isUnresolved() const { return type != NONE; } - + /** Respond to an error check saying we had no error. */ + void respondNone(const MemberId&, uint8_t type, SequenceNumber frameSeq); private: static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE; @@ -78,7 +81,7 @@ class ErrorCheck Multicaster& mcast; FrameQueue frames; MemberSet unresolved; - uint64_t frameSeq; + SequenceNumber frameSeq; ErrorType type; Connection* connection; }; diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 717e1a47cd..190eeb7293 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -19,21 +19,21 @@ * */ +#include "qpid/broker/Message.h" #include "qpid/cluster/ExpiryPolicy.h" #include "qpid/cluster/Multicaster.h" #include "qpid/framing/ClusterMessageExpiredBody.h" #include "qpid/sys/Time.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Timer.h" +#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t) +ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} -struct ExpiryTask : public broker::TimerTask { +struct ExpiryTask : public sys::TimerTask { ExpiryTask(const boost::intrusive_ptr& policy, uint64_t id, sys::AbsTime when) : TimerTask(when), expiryPolicy(policy), expiryId(id) {} void fire() { expiryPolicy->sendExpire(expiryId); } diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index 4ec41c93bc..bdbe3a61dc 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -33,10 +33,13 @@ namespace qpid { namespace broker { -class Timer; class Message; } +namespace sys { +class Timer; +} + namespace cluster { class Multicaster; @@ -46,7 +49,7 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&); + ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); void willExpire(broker::Message&); bool hasExpired(broker::Message&); @@ -78,7 +81,7 @@ class ExpiryPolicy : public broker::ExpiryPolicy boost::intrusive_ptr expiredPolicy; Multicaster& mcast; MemberId memberId; - broker::Timer& timer; + sys::Timer& timer; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index cae506fc06..29e368b671 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -46,8 +46,8 @@ class AMQFrame : public AMQDataBlock QPID_COMMON_EXTERN AMQBody* getBody(); QPID_COMMON_EXTERN const AMQBody* getBody() const; - AMQMethodBody* getMethod() { return getBody()->getMethod(); } - const AMQMethodBody* getMethod() const { return getBody()->getMethod(); } + AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; } + const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; } void setMethod(ClassId c, MethodId m); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 3ea38ca45e..2df10b1e95 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -62,7 +62,6 @@ ManagementAgent::ManagementAgent () : ManagementAgent::~ManagementAgent () { - timer.stop(); { Mutex::ScopedLock lock (userLock); @@ -90,9 +89,10 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, dataDir = _dataDir; interval = _interval; broker = _broker; + timer = &_broker->getTimer(); threadPoolSize = _threads; ManagementObject::maxThreads = threadPoolSize; - timer.add (intrusive_ptr (new Periodic(*this, interval))); + timer->add (new Periodic(*this, interval)); // Get from file or generate and save to file. if (dataDir.empty()) @@ -219,7 +219,7 @@ ManagementAgent::Periodic::~Periodic () {} void ManagementAgent::Periodic::fire () { - agent.timer.add (intrusive_ptr (new Periodic (agent, agent.interval))); + agent.timer->add (new Periodic (agent, agent.interval)); agent.periodicProcessing (); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index c9c9b3f66d..84e84e3daa 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -24,9 +24,9 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/Options.h" #include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Timer.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/ManagementEvent.h" @@ -98,7 +98,7 @@ public: void disallow(const std::string& className, const std::string& methodName, const std::string& message); private: - struct Periodic : public qpid::broker::TimerTask + struct Periodic : public qpid::sys::TimerTask { ManagementAgent& agent; @@ -183,12 +183,12 @@ private: framing::Uuid uuid; sys::Mutex addLock; sys::Mutex userLock; - qpid::broker::Timer timer; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; qpid::broker::Broker* broker; + qpid::sys::Timer* timer; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 1a3ce1c069..b7d52372f4 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -72,6 +72,7 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu FieldTable& headers = msg->getProperties()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); + headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position); route(msg); } diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp index c0cc36efe3..b5911bb71e 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -83,15 +83,29 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); Queue::shared_ptr queue = queues.find(queueName); if (queue) { - FieldTable& headers = msg.getMessage().getProperties()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - msg.deliverTo(queue); - QPID_LOG(debug, "Enqueued replicated message onto " << queueName); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(); - mgmtExchange->inc_byteRoutes( msg.contentSize()); + + SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION)); + + // note that queue will ++ before enqueue. + if (queue->getPosition() > --seqno1) // test queue.pos < seqnumber + { + QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue"); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } else { + queue->setPosition(seqno1); + + FieldTable& headers = msg.getMessage().getProperties()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + headers.erase(QUEUE_MESSAGE_POSITION); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queueName); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes(); + mgmtExchange->inc_byteRoutes( msg.contentSize()); + } } } else { QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist"); diff --git a/qpid/cpp/src/qpid/replication/constants.h b/qpid/cpp/src/qpid/replication/constants.h index fb7085c570..c5ba7d3d6a 100644 --- a/qpid/cpp/src/qpid/replication/constants.h +++ b/qpid/cpp/src/qpid/replication/constants.h @@ -26,6 +26,7 @@ const std::string REPLICATION_EVENT_TYPE("qpid.replication.type"); const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno"); const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue"); const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message"); +const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position"); const int ENQUEUE(1); const int DEQUEUE(2); diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp index d77f4a0b1e..5ed117db93 100644 --- a/qpid/cpp/src/qpid/sys/Timer.cpp +++ b/qpid/cpp/src/qpid/sys/Timer.cpp @@ -20,7 +20,8 @@ */ #include "qpid/sys/Timer.h" #include "qpid/sys/Mutex.h" -#include +#include "qpid/log/Statement.h" + #include using boost::intrusive_ptr; @@ -58,6 +59,8 @@ void TimerTask::setupNextFire() { if (period && readyToFire()) { nextFireTime = AbsTime(nextFireTime, period); cancelled = false; + } else { + QPID_LOG(error, "Couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]"); } } @@ -91,24 +94,39 @@ void Timer::run() } else { intrusive_ptr t = tasks.top(); tasks.pop(); + assert(!(t->nextFireTime < t->sortTime)); + + // warn on extreme lateness + AbsTime start(AbsTime::now()); + Duration late(t->sortTime, start); + if (late > 500 * TIME_MSEC) { + QPID_LOG(warning, "Timer delayed by " << late / TIME_MSEC << "ms"); + } { ScopedLock l(t->callbackLock); if (t->cancelled) { continue; - } else if(t->readyToFire()) { + } else if(Duration(t->nextFireTime, start) >= 0) { Monitor::ScopedUnlock u(monitor); t->fireTask(); + // Warn on callback overrun + AbsTime end(AbsTime::now()); + Duration overrun(tasks.top()->nextFireTime, end); + if (overrun > 1 * TIME_MSEC) { + QPID_LOG(warning, + "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking " + << Duration(start, end) << "]"); + } continue; } else { // If the timer was adjusted into the future it might no longer // be the next event, so push and then get top to make sure // You can only push events into the future - assert(!(t->nextFireTime < t->sortTime)); t->sortTime = t->nextFireTime; tasks.push(t); } } - monitor.wait(tasks.top()->nextFireTime); + monitor.wait(tasks.top()->sortTime); } } } diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 16cdfcbd72..563068a018 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -65,6 +65,7 @@ unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ $(lib_client) $(lib_broker) $(lib_console) unit_test_SOURCES= unit_test.cpp unit_test.h \ + ClientSessionTest.cpp \ BrokerFixture.h SocketProxy.h \ exception_test.cpp \ RefCounted.cpp \ @@ -75,7 +76,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ QueueOptionsTest.cpp \ InlineAllocator.cpp \ InlineVector.cpp \ - ClientSessionTest.cpp \ SequenceSet.cpp \ StringUtils.cpp \ IncompleteMessageList.cpp \ diff --git a/qpid/cpp/src/tests/PartialFailure.cpp b/qpid/cpp/src/tests/PartialFailure.cpp index 7169e53a16..f77a1401f8 100644 --- a/qpid/cpp/src/tests/PartialFailure.cpp +++ b/qpid/cpp/src/tests/PartialFailure.cpp @@ -82,10 +82,31 @@ void queueAndSub(Client& c) { c.subs.subscribe(c.lq, c.name); } +// Handle near-simultaneous errors +QPID_AUTO_TEST_CASE(testCoincidentErrors) { + ClusterFixture cluster(2, updateArgs, -1); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + c0.session.queueDeclare("q", durable=true); + { + ScopedSuppressLogging allQuiet; + async(c0.session).messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception]", "q")); + async(c1.session).messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception]", "q")); + + int alive=0; + try { Client c00(cluster[0], "c00"); ++alive; } catch (...) {} + try { Client c11(cluster[1], "c11"); ++alive; } catch (...) {} + + BOOST_CHECK_EQUAL(alive, 1); + } +} + +#if 0 // FIXME aconway 2009-07-30: // Verify normal cluster-wide errors. QPID_AUTO_TEST_CASE(testNormalErrors) { // FIXME aconway 2009-04-10: Would like to put a scope just around - // the statements expected to fail (in BOOST_CHECK_THROW) but that + // the statements expected to fail (in BOOST_CHECK_yTHROW) but that // sproadically lets out messages, possibly because they're in // Connection thread. @@ -96,7 +117,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) { { ScopedSuppressLogging allQuiet; - queueAndSub(c0); + queueAndsub(c0); c0.session.messageTransfer(content=Message("x", "c0")); BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); @@ -234,5 +255,5 @@ QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { } } #endif - +#endif // FIXME aconway 2009-07-30: QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index be4a87d2ab..b70afa52a7 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -19,6 +19,7 @@ * */ #include "unit_test.h" +#include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -275,11 +276,13 @@ class TestMessageStoreOC : public NullMessageStore uint enqCnt; uint deqCnt; + bool error; virtual void dequeue(TransactionContext*, const boost::intrusive_ptr& /*msg*/, const PersistableQueue& /*queue*/) { + if (error) throw Exception("Dequeue error test"); deqCnt++; } @@ -287,10 +290,16 @@ class TestMessageStoreOC : public NullMessageStore const boost::intrusive_ptr& /*msg*/, const PersistableQueue& /* queue */) { + if (error) throw Exception("Enqueue error test"); enqCnt++; } - TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0) {} + void createError() + { + error=true; + } + + TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {} ~TestMessageStoreOC(){} }; @@ -689,6 +698,30 @@ not requeued to the store. } -QPID_AUTO_TEST_SUITE_END() +QPID_AUTO_TEST_CASE(testLastNodeJournalError){ +/* +simulate store excption going into last node standing + +*/ + TestMessageStoreOC testStore; + client::QueueOptions args; + // set queue mode + args.setPersistLastNode(); + + Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); + intrusive_ptr received; + queue1->configure(args); + + // check requeue 1 + intrusive_ptr msg1 = create_message("e", "C"); + + queue1->deliver(msg1); + testStore.createError(); + + ScopedSuppressLogging sl; // Suppress messages for expected errors. + queue1->setLastNodeFailure(); + BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); + +}QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/allSegmentTypes.h b/qpid/cpp/src/tests/allSegmentTypes.h deleted file mode 100644 index e942250c89..0000000000 --- a/qpid/cpp/src/tests/allSegmentTypes.h +++ /dev/null @@ -1,128 +0,0 @@ -#ifndef TESTS_ALLSEGMENTTYPES_H -#define TESTS_ALLSEGMENTTYPES_H -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/// -/// This file was automatically generated from the AMQP specification. -/// Do not edit. -/// - - -#include "qpid/amqp_0_10/specification.h" -#include "qpid/amqp_0_10/Header.h" -#include "qpid/amqp_0_10/Body.h" - -using namespace qpid::amqp_0_10; - -template size_t allSegmentTypes(Op& op) { - op(Header()); - op(Body()); - op(ControlHolder(connection::Start())); - op(ControlHolder(connection::StartOk())); - op(ControlHolder(connection::Secure())); - op(ControlHolder(connection::SecureOk())); - op(ControlHolder(connection::Tune())); - op(ControlHolder(connection::TuneOk())); - op(ControlHolder(connection::Open())); - op(ControlHolder(connection::OpenOk())); - // op(ControlHolder(connection::Redirect())); // known-hosts array - op(ControlHolder(connection::Heartbeat())); - // op(ControlHolder(connection::Close())); // class/method dropped - op(ControlHolder(connection::CloseOk())); - op(ControlHolder(session::Attach())); - op(ControlHolder(session::Attached())); - op(ControlHolder(session::Detach())); - op(ControlHolder(session::Detached())); - op(ControlHolder(session::RequestTimeout())); - op(ControlHolder(session::Timeout())); - op(ControlHolder(session::CommandPoint())); - // op(ControlHolder(session::Expected())); // fragments array encoding problem - // op(ControlHolder(session::Confirmed())); // fragments array encoding problem - op(ControlHolder(session::Completed())); - op(ControlHolder(session::KnownCompleted())); - op(ControlHolder(session::Flush())); - op(ControlHolder(session::Gap())); - // FIXME aconway 2008-04-15: command encoding, fix headers, fix sized structs. - op(CommandHolder(execution::Sync())); - op(CommandHolder(execution::Result())); - - // FIXME aconway 2008-04-16: investigate remaining failures. - // op(CommandHolder(execution::Exception())); - op(CommandHolder(message::Transfer())); - op(CommandHolder(message::Accept())); - // op(CommandHolder(message::Reject())); - op(CommandHolder(message::Release())); - op(CommandHolder(message::Acquire())); - // op(CommandHolder(message::Resume())); - op(CommandHolder(message::Subscribe())); - op(CommandHolder(message::Cancel())); - op(CommandHolder(message::SetFlowMode())); - op(CommandHolder(message::Flow())); - op(CommandHolder(message::Flush())); - op(CommandHolder(message::Stop())); - op(CommandHolder(tx::Select())); - op(CommandHolder(tx::Commit())); - op(CommandHolder(tx::Rollback())); - op(CommandHolder(dtx::Select())); - // op(CommandHolder(dtx::Start())); - // op(CommandHolder(dtx::End())); - // op(CommandHolder(dtx::Commit())); - // op(CommandHolder(dtx::Forget())); - // op(CommandHolder(dtx::GetTimeout())); - // op(CommandHolder(dtx::Prepare())); - // op(CommandHolder(dtx::Recover())); - // op(CommandHolder(dtx::Rollback())); - // op(CommandHolder(dtx::SetTimeout())); - op(CommandHolder(exchange::Declare())); - op(CommandHolder(exchange::Delete())); - op(CommandHolder(exchange::Query())); - op(CommandHolder(exchange::Bind())); - op(CommandHolder(exchange::Unbind())); - op(CommandHolder(exchange::Bound())); - op(CommandHolder(queue::Declare())); - op(CommandHolder(queue::Delete())); - op(CommandHolder(queue::Purge())); - op(CommandHolder(queue::Query())); - // op(CommandHolder(file::Qos())); - // op(CommandHolder(file::QosOk())); -// op(CommandHolder(file::Consume())); -// op(CommandHolder(file::ConsumeOk())); -// op(CommandHolder(file::Cancel())); -// op(CommandHolder(file::Open())); -// op(CommandHolder(file::OpenOk())); -// op(CommandHolder(file::Stage())); -// op(CommandHolder(file::Publish())); -// op(CommandHolder(file::Return())); -// op(CommandHolder(file::Deliver())); -// op(CommandHolder(file::Ack())); -// op(CommandHolder(file::Reject())); -// op(CommandHolder(stream::Qos())); -// op(CommandHolder(stream::QosOk())); -// op(CommandHolder(stream::Consume())); -// op(CommandHolder(stream::ConsumeOk())); -// op(CommandHolder(stream::Cancel())); -// op(CommandHolder(stream::Publish())); -// op(CommandHolder(stream::Return())); -// op(CommandHolder(stream::Deliver())); - return 0; -} -#endif /*!TESTS_ALLSEGMENTTYPES_H*/ diff --git a/qpid/cpp/src/tests/replication_test b/qpid/cpp/src/tests/replication_test index 6e0c1c8d3b..8b3022b260 100755 --- a/qpid/cpp/src/tests/replication_test +++ b/qpid/cpp/src/tests/replication_test @@ -56,10 +56,12 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2 $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1 $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 1 $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e #queue-d deliberately not declared on DR; this error case should be handled #publish and consume from test queues on broker A: @@ -89,6 +91,12 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f ./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null ./receiver --port $BROKER_A --queue queue-d > /dev/null + + # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side + # making sure all the messages have been flushed from the replication queue. + echo dummy | ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 + ./receiver --port $BROKER_B --queue queue-e --messages 1 > /dev/null + #shutdown broker A then check that broker Bs versions of the queues are as expected ../qpidd -q --port $BROKER_A unset BROKER_A @@ -98,7 +106,6 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl - stop_brokers tail -5 queue-a-input.repl > queue-a-expected.repl tail -10 queue-b-input.repl > queue-b-expected.repl @@ -108,6 +115,60 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f grep 'queue-d does not exist' replication-dest.log > /dev/null || echo "WARNING: Expected error to be logged!" + stop_brokers + + # now check offsets working (enqueue based on position being set, not queue abs position) + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port + BROKER_A=`cat qpidd.port` + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port + BROKER_B=`cat qpidd.port` + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 1 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-d + + i=1 + while [ $i -le 10 ]; do + echo Message $i for A >> queue-e-input.repl + i=`expr $i + 1` + done + + ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e-input.repl + ./receiver --port $BROKER_A --queue queue-e --messages 10 > /dev/null + + # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side + # making sure all the messages have been flushed from the replication queue. + echo dummy | ./sender --port $BROKER_A --routing-key queue-d --send-eos 1 + ./receiver --port $BROKER_B --queue queue-d --messages 1 > /dev/null + + # now check offsets working + ../qpidd -q --port $BROKER_B + unset BROKER_B + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port + BROKER_B=`cat qpidd.port` + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + # now send another 15 + i=11 + while [ $i -le 15 ]; do + echo Message $i for A >> queue-e1-input.repl + i=`expr $i + 1` + done + ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e1-input.repl + + ./receiver --port $BROKER_B --queue queue-e > queue-e-backup.repl + diff queue-e-backup.repl queue-e1-input.repl || FAIL=1 + + stop_brokers + if [ x$FAIL != x ]; then echo replication test failed: expectations not met! exit 1 diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 92917dcfa6..1e5b091a87 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -68,7 +68,7 @@ - + @@ -170,7 +170,7 @@ - + diff --git a/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm b/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm index 18d90fab29..bcf7db345b 100644 --- a/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm +++ b/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm @@ -39,12 +39,14 @@ public class ProtocolVersion implements Comparable { private final byte _majorVersion; private final byte _minorVersion; + private final String _stringFormat; public ProtocolVersion(byte majorVersion, byte minorVersion) { _majorVersion = majorVersion; _minorVersion = minorVersion; + _stringFormat = _majorVersion+"-"+_minorVersion; } public byte getMajorVersion() @@ -57,6 +59,10 @@ public class ProtocolVersion implements Comparable return _minorVersion; } + public String toString() + { + return _stringFormat; + } public int compareTo(Object o) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index c6e370206e..e963fb23ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -58,6 +58,12 @@ import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.logging.actors.AMQPChannelActor; +import org.apache.qpid.server.logging.actors.CurrentActor; public class AMQChannel { @@ -112,13 +118,22 @@ public class AMQChannel // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private boolean _closing; + + private LogActor _actor; + private LogSubject _logSubject; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; + + _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); + _logSubject = new ChannelLogSubject(this); + + _actor.message(ChannelMessages.CHN_1001()); + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); @@ -386,6 +401,8 @@ public class AMQChannel private void setClosing(boolean closing) { _closing = closing; + + CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003()); } private void unsubscribeAllConsumers() throws AMQException @@ -789,6 +806,8 @@ public class AMQChannel boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { + _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started")); + if (wasSuspended) { // may need to deliver queued messages @@ -891,6 +910,8 @@ public class AMQChannel public void setCredit(final long prefetchSize, final int prefetchCount) { + //fixme +// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } @@ -942,4 +963,9 @@ public class AMQChannel { return _recordDeliveryMethod; } + + public LogActor getLogActor() + { + return _actor; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index e56f1cda12..90b4590d4c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -256,7 +256,7 @@ public class ServerConfiguration implements SignalHandler // Our configuration class needs to make the interpolate method // public so it can be called below from the config method. - private static class MyConfiguration extends CompositeConfiguration + public static class MyConfiguration extends CompositeConfiguration { public String interpolate(String obj) { @@ -264,7 +264,7 @@ public class ServerConfiguration implements SignalHandler } } - private final static Configuration flatConfig(File file) throws ConfigurationException + public final static Configuration flatConfig(File file) throws ConfigurationException { // We have to override the interpolate methods so that // interpolation takes place accross the entirety of the diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java index 203a5d160d..d5683b3c7b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java @@ -40,4 +40,21 @@ public interface LogActor * @param message The message to log */ public void message(LogSubject subject, LogMessage message); -} \ No newline at end of file + + /** + * Logs the specified LogMessage against this actor + * + * Currently logging has a global setting however this will later be revised and + * as such the LogActor will need to take into consideration any new configuration + * as a means of enabling the logging of LogActors and LogSubjects. + * + * @param message The message to log + */ + public void message(LogMessage message); + + /** + * + * @return the RootMessageLogger that is currently in use by this LogActor. + */ + RootMessageLogger getRootMessageLogger(); +} \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java index cd7992faa7..5ac5eab6c4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java @@ -37,6 +37,14 @@ public interface RootMessageLogger */ boolean isMessageEnabled(LogActor actor, LogSubject subject); + /** + * Determine if the LogActor should be generating log messages. + * + * @param actor The actor requesting the logging + * + * @return boolean true if the message should be logged. + */ + boolean isMessageEnabled(LogActor actor); /** * Log the raw message to the configured logger. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java index 1c2b4e4046..a3bf276d1e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java @@ -40,6 +40,11 @@ public class RootMessageLoggerImpl implements RootMessageLogger return _enabled; } + public boolean isMessageEnabled(LogActor actor) + { + return _enabled; + } + public void rawMessage(String message) { _rawLogger.rawMessage(MESSAGE + message); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java index 95f2dc9ff6..4502710dd6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java @@ -32,6 +32,10 @@ public abstract class AbstractActor implements LogActor public AbstractActor(RootMessageLogger rootLogger) { + if(rootLogger == null) + { + throw new NullPointerException("RootMessageLogger cannot be null"); + } _rootLogger = rootLogger; } @@ -42,4 +46,18 @@ public abstract class AbstractActor implements LogActor _rootLogger.rawMessage(_logString + String.valueOf(subject) + message); } } + + public void message(LogMessage message) + { + if (_rootLogger.isMessageEnabled(this)) + { + _rootLogger.rawMessage(_logString + message); + } + } + + public RootMessageLogger getRootMessageLogger() + { + return _rootLogger; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 8e9ee3720c..24df17683f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -235,12 +235,12 @@ MST-1006 = Recovery Complete[ : {0}] #Connection # 0 - Client id # 1 - Protocol Version -CON-1001 = Open : Client ID {0}[ : Protocol Version : {1}] +CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}] CON-1002 = Close #Channel -# 0 - count -CHN-1001 = Create : Prefetch {0, number} +CHN-1001 = Create +# : Prefetch Size {0,number} : Count {1,number} # 0 - flow CHN-1002 = Flow {0} CHN-1003 = Close diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java index 3774155626..f996576f31 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java @@ -41,6 +41,7 @@ public class Log4jMessageLogger implements RawMessageLogger _level = Level.toLevel(level); _rawMessageLogger = Logger.getLogger(logger); + _rawMessageLogger.setLevel(_level); } public void rawMessage(String message) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java new file mode 100644 index 0000000000..28d64de74e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.MessageStore; + +public class MessagesStoreLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the MessagesStoreLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Message Store Type + */ + protected static String BINDING_FORMAT = "vh(/{0})/ms({1})"; + + /** Create an ExchangeLogSubject that Logs in the following format. */ + public MessagesStoreLogSubject(VirtualHost vhost, MessageStore store) + { + setLogStringWithFormat(BINDING_FORMAT, vhost.getName(), + store.getClass().getSimpleName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 5f1615351b..2a6cab6048 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -54,6 +54,10 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -139,6 +143,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private final long _sessionID = idGenerator.getAndIncrement(); private AMQPConnectionActor _actor; + private LogSubject _logSubject; public ManagedObject getManagedObject() { @@ -156,6 +161,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); + try { IoServiceConfig config = session.getServiceConfig(); @@ -171,18 +178,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, - AMQStateManager stateManager) throws AMQException - { - _stateManager = stateManager; - _minaProtocolSession = session; - session.setAttachment(this); - - _codecFactory = codecFactory; - - _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); - } - private AMQProtocolSessionMBean createMBean() throws AMQException { try @@ -211,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _sessionID; } + public LogActor getLogActor() + { + return _actor; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -236,42 +236,54 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); - if (_logger.isDebugEnabled()) + //Look up the Channel's Actor and set that as the current actor + // If that is not available then we can use the ConnectionActor + // that is associated with this AMQMPSession. + LogActor channelActor = null; + if (_channelMap.get(channelId) != null) { - _logger.debug("Frame Received: " + frame); + channelActor = _channelMap.get(channelId).getLogActor(); } + CurrentActor.set(channelActor == null ? _actor : channelActor); - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) + try { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) + if (_logger.isDebugEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } + _logger.debug("Frame Received: " + frame); } - else + + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) { - if (_logger.isInfoEnabled()) + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) { - _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); + } - closeProtocolSession(); - return; + closeProtocolSession(); + return; + } } - } - CurrentActor.set(_actor); - try - { - body.handle(channelId, this); - } - catch (AMQException e) - { - closeChannel(channelId); - throw e; + try + { + body.handle(channelId, this); + } + catch (AMQException e) + { + closeChannel(channelId); + throw e; + } } finally { @@ -285,6 +297,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { + // Log incomming protocol negotiation request + _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true)); + ProtocolVersion pv = pi.checkVersion(); // Fails if not correct // This sets the protocol version (and hence framing classes) for this session. @@ -643,6 +658,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (!_closed) { _closed = true; + + _actor.message(ConnectionMessages.CON_1002()); if (_virtualHost != null) { @@ -770,7 +787,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) { - setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); + String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE); + setContextKey(new AMQShortString(clientID)); + + // Log the Opening of the connection for this client + _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true)); } if (_clientProperties.getString(ClientProperties.version.toString()) != null) @@ -829,6 +850,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _virtualHost = virtualHost; _actor.virtualHostSelected(this); + _logSubject = new ConnectionLogSubject(this); _virtualHost.getConnectionRegistry().registerConnection(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b15e6a4219..21fd1ce6b4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -29,6 +29,8 @@ import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -39,6 +41,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin { long getSessionID(); + LogActor getLogActor(); + public static final class ProtocolSessionIdentifier { private final Object _sessionIdentifier; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 3e1477fc1c..e86c85e0e4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -65,6 +65,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; @@ -185,6 +187,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed */ public void commitTransactions(int channelId) throws JMException { + CurrentActor.set(getLogActor()); try { AMQChannel channel = _session.getChannel(channelId); @@ -199,6 +202,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { throw new MBeanException(ex, ex.toString()); } + finally + { + CurrentActor.remove(); + } } /** @@ -209,6 +216,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed */ public void rollbackTransactions(int channelId) throws JMException { + CurrentActor.set(getLogActor()); try { AMQChannel channel = _session.getChannel(channelId); @@ -223,6 +231,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { throw new MBeanException(ex, ex.toString()); } + finally + { + CurrentActor.remove(); + } } /** @@ -269,18 +281,38 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed 0, 0); - _session.writeFrame(responseBody.generateFrame(0)); - + CurrentActor.set(getLogActor()); try { - _session.closeSession(); + _session.writeFrame(responseBody.generateFrame(0)); + + try + { + + _session.closeSession(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } } - catch (AMQException ex) + finally { - throw new MBeanException(ex, ex.toString()); + CurrentActor.remove(); } } + /** + * Return the LogActor for this MBean Session + * //fixme currently simply returning the managed sessions LogActor, should + * be the ManagementActor + * @return + */ + private LogActor getLogActor() + { + return _session.getLogActor(); + } + @Override public MBeanNotificationInfo[] getNotificationInfo() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 649e84cb50..ad1e8a580e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -88,7 +88,7 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private VirtualHostConfiguration _configuration; - + public void setAccessableName(String name) { _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" @@ -386,11 +386,6 @@ public class VirtualHost implements Accessable return _exchangeFactory; } - public ApplicationRegistry getApplicationRegistry() - { - throw new UnsupportedOperationException(); - } - public MessageStore getMessageStore() { return _messageStore; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index ef345f45c4..28f5d417ff 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; @@ -475,14 +476,14 @@ public class AbstractHeadersExchangeTestBase extends TestCase new LinkedList() ); - Message(String id, String... headers) throws AMQException + Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException { - this(id, getHeaders(headers)); + this(protocolSession, id, getHeaders(headers)); } - Message(String id, FieldTable headers) throws AMQException + Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException { - this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); } public IncomingMessage getIncomingMessage() @@ -490,7 +491,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return _incoming; } - private Message(long messageId, + private Message(AMQProtocolSession protocolsession, long messageId, MessagePublishInfo publish, ContentHeaderBody header, List bodies) throws AMQException @@ -499,7 +500,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase - _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); + _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession); _incoming.setContentHeaderBody(header); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index a26f7f16d7..27ef04431b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -57,7 +57,7 @@ public class DestWildExchangeTest extends TestCase _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); _store = new MemoryMessageStore(); _context = new StoreContext(); - _protocolSession = new InternalTestProtocolSession(); + _protocolSession = new InternalTestProtocolSession(_vhost); } public void tearDown() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index fd11ddeae2..3c6abd0ad9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -23,14 +23,21 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; -import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { + AMQProtocolSession _protocolSession; + protected void setUp() throws Exception { super.setUp(); ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); + // Just use the first vhost. + VirtualHost virtualHost = ApplicationRegistry.getInstance(1).getVirtualHostRegistry().getVirtualHosts().iterator().next(); + _protocolSession = new InternalTestProtocolSession(virtualHost); } protected void tearDown() @@ -49,21 +56,21 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase TestQueue q7 = bindDefault("F0000", "F0001=Bear"); TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - routeAndTest(new Message("Message1", "F0000"), q1); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), + routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); + routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); + routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); + routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); + routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message("Message6", "F0002")); + routeAndTest(new Message(_protocolSession, "Message6", "F0002")); - Message m7 = new Message("Message7", "XXXXX"); + Message m7 = new Message(_protocolSession, "Message7", "XXXXX"); MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); pb7.setMandatory(true); routeAndTest(m7,true); - Message m8 = new Message("Message8", "F0000"); + Message m8 = new Message(_protocolSession, "Message8", "F0000"); MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); pb8.setMandatory(true); routeAndTest(m8,false,q1); @@ -79,19 +86,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - routeAndTest(new Message("Message1", "F0000"), q1, q3); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message6", "F0002")); + routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3); + routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4); + routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); + routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(new Message(_protocolSession, "Message6", "F0002")); } public void testMandatory() throws AMQException { bindDefault("F0000"); - Message m1 = new Message("Message1", "XXXXX"); - Message m2 = new Message("Message2", "F0000"); + Message m1 = new Message(_protocolSession, "Message1", "XXXXX"); + Message m2 = new Message(_protocolSession, "Message2", "F0000"); MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); pb1.setMandatory(true); MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java index 298e3bc22c..009699be35 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java @@ -27,11 +27,9 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.RootMessageLoggerImpl; @@ -56,38 +54,33 @@ public class AMQPChannelActorTest extends TestCase LogActor _amqpActor; UnitTestMessageLogger _rawLogger; + AMQProtocolSession _session; + AMQChannel _channel; public void setUp() throws ConfigurationException, AMQException { Configuration config = new PropertiesConfiguration(); ServerConfiguration serverConfig = new ServerConfiguration(config); + setUpWithConfig(serverConfig); + } + + private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException + { _rawLogger = new UnitTestMessageLogger(); RootMessageLogger rootLogger = new RootMessageLoggerImpl(serverConfig, _rawLogger); + VirtualHost virtualHost = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts().iterator().next(); + // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } + _session = new InternalTestProtocolSession(virtualHost); - AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore()); + _channel = new AMQChannel(_session, 1, _session.getVirtualHost().getMessageStore()); - _amqpActor = new AMQPChannelActor(channel, rootLogger); + _amqpActor = new AMQPChannelActor(_channel, rootLogger); } @@ -105,22 +98,7 @@ public class AMQPChannelActorTest extends TestCase */ public void testChannel() { - final String message = "test logging"; - - _amqpActor.message(new LogSubject() - { - public String toString() - { - return "[AMQPActorTest]"; - } - - }, new LogMessage() - { - public String toString() - { - return message; - } - }); + final String message = sendTestMessage(); List logs = _rawLogger.getLogMessages(); @@ -146,40 +124,12 @@ public class AMQPChannelActorTest extends TestCase } - public void testChannelLoggingOff() throws ConfigurationException, AMQException + /** + * Log a message using the test Actor + * @return the logged message + */ + private String sendTestMessage() { - Configuration config = new PropertiesConfiguration(); - config.addProperty("status-updates", "OFF"); - - ServerConfiguration serverConfig = new ServerConfiguration(config); - - _rawLogger = new UnitTestMessageLogger(); - RootMessageLogger rootLogger = - new RootMessageLoggerImpl(serverConfig, _rawLogger); - - // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } - - - AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore()); - - _amqpActor = new AMQPChannelActor(channel, rootLogger); - final String message = "test logging"; _amqpActor.message(new LogSubject() @@ -196,6 +146,152 @@ public class AMQPChannelActorTest extends TestCase return message; } }); + return message; + } + + /** + * Test that if logging is configured to be off in the configuration that + * no logging is presented + * @throws ConfigurationException + * @throws AMQException + */ + public void testChannelLoggingOFF() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "OFF"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + setUpWithConfig(serverConfig); + + sendTestMessage(); + + List logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + + /** + * Test that if logging is configured to be off in the configuration that + * no logging is presented + * @throws ConfigurationException + * @throws AMQException + */ + public void testChannelLoggingOfF() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "OfF"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + setUpWithConfig(serverConfig); + + sendTestMessage(); + + List logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + + /** + * Test that if logging is configured to be off in the configuration that + * no logging is presented + * @throws ConfigurationException + * @throws AMQException + */ + public void testChannelLoggingOff() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "Off"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + setUpWithConfig(serverConfig); + + sendTestMessage(); + + List logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + + /** + * Test that if logging is configured to be off in the configuration that + * no logging is presented + * @throws ConfigurationException + * @throws AMQException + */ + public void testChannelLoggingofF() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "ofF"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + setUpWithConfig(serverConfig); + + sendTestMessage(); + + List logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + + /** + * Test that if logging is configured to be off in the configuration that + * no logging is presented + * @throws ConfigurationException + * @throws AMQException + */ + public void testChannelLoggingoff() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "off"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + setUpWithConfig(serverConfig); + + sendTestMessage(); + + List logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + + /** + * Test that if logging is configured to be off in the configuration that + * no logging is presented + * @throws ConfigurationException + * @throws AMQException + */ + public void testChannelLoggingoFf() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "oFf"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + setUpWithConfig(serverConfig); + + sendTestMessage(); List logs = _rawLogger.getLogMessages(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java index c220865864..54eddf1050 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java @@ -26,18 +26,16 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.RootMessageLoggerImpl; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogActor; import java.util.List; @@ -56,41 +54,34 @@ public class AMQPConnectionActorTest extends TestCase LogActor _amqpActor; UnitTestMessageLogger _rawLogger; - public void setUp() throws ConfigurationException + public void setUp() throws ConfigurationException, AMQException { Configuration config = new PropertiesConfiguration(); ServerConfiguration serverConfig = new ServerConfiguration(config); + setUpWithConfig(serverConfig); + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + } + + private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException + { _rawLogger = new UnitTestMessageLogger(); RootMessageLogger rootLogger = new RootMessageLoggerImpl(serverConfig, _rawLogger); + VirtualHost virtualHost = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts().iterator().next(); + // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } + AMQProtocolSession session = new InternalTestProtocolSession(virtualHost); _amqpActor = new AMQPConnectionActor(session, rootLogger); } - public void tearDown() - { - _rawLogger.clearLogMessages(); - } - /** * Test the AMQPActor logging as a Connection level. * @@ -98,26 +89,10 @@ public class AMQPConnectionActorTest extends TestCase * * The log message should be fully repalaced (no '{n}' values) and should * not contain any channel identification. - * */ public void testConnection() { - final String message = "test logging"; - - _amqpActor.message(new LogSubject() - { - public String toString() - { - return "[AMQPActorTest]"; - } - - }, new LogMessage() - { - public String toString() - { - return message; - } - }); + final String message = sendLogMessage(); List logs = _rawLogger.getLogMessages(); @@ -129,7 +104,7 @@ public class AMQPConnectionActorTest extends TestCase // Verify that the message has the correct type assertTrue("Message contains the [con: prefix", - logs.get(0).toString().contains("[con:")); + logs.get(0).toString().contains("[con:")); // Verify that all the values were presented to the MessageFormatter // so we will not end up with '{n}' entries in the log. @@ -138,11 +113,9 @@ public class AMQPConnectionActorTest extends TestCase // Verify that the logged message does not contains the 'ch:' marker assertFalse("Message was logged with a channel identifier." + logs.get(0), - logs.get(0).toString().contains("/ch:")); + logs.get(0).toString().contains("/ch:")); } - - public void testConnectionLoggingOff() throws ConfigurationException, AMQException { Configuration config = new PropertiesConfiguration(); @@ -150,31 +123,18 @@ public class AMQPConnectionActorTest extends TestCase ServerConfiguration serverConfig = new ServerConfiguration(config); - _rawLogger = new UnitTestMessageLogger(); - RootMessageLogger rootLogger = - new RootMessageLoggerImpl(serverConfig, _rawLogger); + setUpWithConfig(serverConfig); - // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } + sendLogMessage(); + List logs = _rawLogger.getLogMessages(); - _amqpActor = new AMQPConnectionActor(session, rootLogger); + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + private String sendLogMessage() + { final String message = "test logging"; _amqpActor.message(new LogSubject() @@ -191,12 +151,7 @@ public class AMQPConnectionActorTest extends TestCase return message; } }); - - List logs = _rawLogger.getLogMessages(); - - assertEquals("Message log size not as expected.", 0, logs.size()); - + return message; } - } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java index c1cc3253a8..e7dc0ea5da 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java @@ -26,9 +26,8 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; /** @@ -61,27 +60,16 @@ public class CurrentActorTest extends TestCase final Exception[] _errors = new Exception[THREADS]; // Create a single session for this test. - AMQProtocolSession session; + AMQProtocolSession _session; - public void setUp() + public void setUp() throws AMQException { // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } + VirtualHost virtualHost = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts().iterator().next(); + + // Create a single session for this test. + _session = new InternalTestProtocolSession(virtualHost); } public void testFIFO() throws AMQException @@ -89,7 +77,7 @@ public class CurrentActorTest extends TestCase // Create a new actor using retrieving the rootMessageLogger from // the default ApplicationRegistry. //fixme reminder that we need a better approach for broker testing. - AMQPConnectionActor connectionActor = new AMQPConnectionActor(session, + AMQPConnectionActor connectionActor = new AMQPConnectionActor(_session, ApplicationRegistry.getInstance(). getRootMessageLogger()); @@ -120,7 +108,7 @@ public class CurrentActorTest extends TestCase * to push the actor on to the stack */ - AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore()); + AMQChannel channel = new AMQChannel(_session, 1, _session.getVirtualHost().getMessageStore()); AMQPChannelActor channelActor = new AMQPChannelActor(channel, ApplicationRegistry.getInstance(). @@ -218,7 +206,7 @@ public class CurrentActorTest extends TestCase // Create a new actor using retrieving the rootMessageLogger from // the default ApplicationRegistry. //fixme reminder that we need a better approach for broker testing. - AMQPConnectionActor actor = new AMQPConnectionActor(session, + AMQPConnectionActor actor = new AMQPConnectionActor(_session, ApplicationRegistry.getInstance(). getRootMessageLogger()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java index 2a37eae728..b4dd3da2e6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java @@ -27,15 +27,12 @@ public class ChannelMessagesTest extends AbstractTestMessages { public void testMessage1001() { - Integer prefetch = 12345; - - _logMessage = ChannelMessages.CHN_1001(prefetch); + _logMessage = ChannelMessages.CHN_1001(); List log = performLog(); // We use the MessageFormat here as that is what the ChannelMessage // will do, this makes the resulting value 12,345 - String[] expected = {"Create", "Prefetch", - MessageFormat.format("{0, number}", prefetch)}; + String[] expected = {"Create"}; validateLogMessage(log, "CHN-1001", expected); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java index eb76029a5c..d234c88210 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java @@ -24,12 +24,12 @@ import java.util.List; public class ConnectionMessagesTest extends AbstractTestMessages { - public void testMessage1001_WithProtocolVersion() + public void testMessage1001_WithClientIDProtocolVersion() { String clientID = "client"; String protocolVersion = "8-0"; - _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true); + _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true , true); List log = performLog(); String[] expected = {"Open :", "Client ID", clientID, @@ -38,11 +38,11 @@ public class ConnectionMessagesTest extends AbstractTestMessages validateLogMessage(log, "CON-1001", expected); } - public void testMessage1001_NoProtocolVersion() + public void testMessage1001_WithClientIDNoProtocolVersion() { String clientID = "client"; - _logMessage = ConnectionMessages.CON_1001(clientID, null, false); + _logMessage = ConnectionMessages.CON_1001(clientID, null,true, false); List log = performLog(); String[] expected = {"Open :", "Client ID", clientID}; @@ -50,6 +50,29 @@ public class ConnectionMessagesTest extends AbstractTestMessages validateLogMessage(log, "CON-1001", expected); } + public void testMessage1001_WithNOClientIDProtocolVersion() + { + String protocolVersion = "8-0"; + + _logMessage = ConnectionMessages.CON_1001(null, protocolVersion, false , true); + List log = performLog(); + + String[] expected = {"Open", ": Protocol Version :", protocolVersion}; + + validateLogMessage(log, "CON-1001", expected); + } + + public void testMessage1001_WithNoClientIDNoProtocolVersion() + { + _logMessage = ConnectionMessages.CON_1001(null, null,false, false); + List log = performLog(); + + String[] expected = {"Open"}; + + validateLogMessage(log, "CON-1001", expected); + } + + public void testMessage1002() { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java index d7a5aa667b..4b69a46793 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java @@ -77,10 +77,8 @@ public class Log4jMessageLoggerTest extends TestCase * Verify that the default configuraion of Log4jMessageLogger will * log a message. * - * @throws IOException - * @throws InterruptedException */ - public void testDefaultLogsMessage() throws IOException, InterruptedException + public void testDefaultLogsMessage() { // Create a logger to test Log4jMessageLogger logger = new Log4jMessageLogger(); @@ -95,28 +93,17 @@ public class Log4jMessageLoggerTest extends TestCase } /** - * This test checks that if the Root Logger level is set such that the INFO - * messages would not be logged then the Log4jMessageLogger default of INFO - * will result in logging not being presented. + * This test verifies that the Log4jMessageLogger does not inherit a logging + * level from the RootLogger. The Log4jMessageLogger default of INFO + * will result in logging being presented. * - * @throws IOException - * @throws InterruptedException */ - public void testDefaultsLogsAtInfo() throws IOException, InterruptedException + public void testLoggerDoesNotInheritRootLevel() { - // Create a logger to test - Log4jMessageLogger logger = new Log4jMessageLogger(); - - //Create Message for test - String message = "testDefaults"; - //Set default logger level to off - Logger.getRootLogger().setLevel(Level.WARN); + Logger.getRootLogger().setLevel(Level.OFF); - // Log the message - logger.rawMessage(message); - - verifyNoLog(message); + testDefaultLogsMessage(); } /** @@ -125,10 +112,8 @@ public class Log4jMessageLoggerTest extends TestCase * Test this by setting the default logger level to off which has been * verified to work by test 'testDefaultsLevelObeyed' * - * @throws IOException - * @throws InterruptedException */ - public void testDefaultLoggerAdjustment() throws IOException, InterruptedException + public void testDefaultLoggerAdjustment() { String loggerName = "TestLogger"; // Create a logger to test @@ -150,41 +135,6 @@ public class Log4jMessageLoggerTest extends TestCase Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(originalLevel); } - /** - * Test that changing the log level has an effect. - * Set the level to be debug - * but only set the logger to log at INFO - * there should be no data printed - * subsequently changing the root logger to allow DEBUG should - * show the message - * - * @throws IOException - * @throws InterruptedException - */ - public void testDefaultsLevelObeyed() throws IOException, InterruptedException - { - // Create a logger to test - Log4jMessageLogger logger = new Log4jMessageLogger("DEBUG", Log4jMessageLogger.DEFAULT_LOGGER); - - //Create Message for test - String message = "testDefaults"; - - //Set root logger to INFO only - Logger.getRootLogger().setLevel(Level.INFO); - - // Log the message - logger.rawMessage(message); - - verifyNoLog(message); - - //Set root logger to INFO only - Logger.getRootLogger().setLevel(Level.DEBUG); - - // Log the message - logger.rawMessage(message); - - verifyLogPresent(message); - } /** * Check that the Log Message reached log4j diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index 02cdbdbe6a..56d611a44f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -36,6 +36,9 @@ import org.apache.qpid.server.logging.actors.TestBlankActor; import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.List; @@ -44,6 +47,19 @@ public abstract class AbstractTestLogSubject extends TestCase protected Configuration _config = new PropertiesConfiguration(); protected LogSubject _subject = null; + AMQProtocolSession _session; + + public void setUp() throws Exception + { + super.setUp(); + + VirtualHost virtualHost = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts().iterator().next(); + + // Create a single session for this test. + _session = new InternalTestProtocolSession(virtualHost); + } + protected List performLog() throws ConfigurationException { if (_subject == null) @@ -96,15 +112,19 @@ public abstract class AbstractTestLogSubject extends TestCase assertEquals("Username not as expected", userNameParts[0], user); // Extract IP. + // The connection will be of the format - guest@/127.0.0.1:1/test + // and so our userNamePart will be '/127.0.0.1:1/test' String[] ipParts = userNameParts[1].split("/"); + // We will have three sections assertEquals("Unable to split IP from rest of Connection:" - + userNameParts[1], 2, ipParts.length); + + userNameParts[1], 3, ipParts.length); - assertEquals("IP not as expected", ipParts[0], ipString); + // We need to skip the first '/' split will be empty so validate 1 as IP + assertEquals("IP not as expected", ipString, ipParts[1]); - //Finally check vhost - assertEquals("Virtualhost name not as expected.", vhost, ipParts[1]); + //Finally check vhost which is section 2 + assertEquals("Virtualhost name not as expected.", vhost, ipParts[2]); } /** @@ -172,7 +192,7 @@ public abstract class AbstractTestLogSubject extends TestCase * @param message the message to search * @param vhost the vhostName to check against */ - protected void verifyVirtualHost(String message, VirtualHost vhost) + static public void verifyVirtualHost(String message, VirtualHost vhost) { String vhostSlice = getSlice("vh", message); @@ -199,7 +219,7 @@ public abstract class AbstractTestLogSubject extends TestCase * * @return the slice if found otherwise null is returned */ - protected String getSlice(String sliceID, String message) + static public String getSlice(String sliceID, String message) { int indexOfSlice = message.indexOf(sliceID + "("); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java index 9d5cb70f4b..75f31d53d1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.AMQException; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.MockProtocolSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; public class ChannelLogSubjectTest extends ConnectionLogSubjectTest { @@ -37,24 +31,6 @@ public class ChannelLogSubjectTest extends ConnectionLogSubjectTest { super.setUp(); - // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - _session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - _session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } - AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore()); _subject = new ChannelLogSubject(channel); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java index ff2d9b5e11..0eb9901757 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -20,39 +20,13 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.MockProtocolSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; - public class ConnectionLogSubjectTest extends AbstractTestLogSubject { - AMQProtocolSession _session; public void setUp() throws Exception { super.setUp(); - // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - _session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - _session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } - _subject = new ConnectionLogSubject(_session); } @@ -63,7 +37,7 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject */ protected void validateLogStatement(String message) { - verifyConnection(_session.getSessionID(), "MockProtocolSessionUser", "null", "test", message); + verifyConnection(_session.getSessionID(), "InternalTestProtocolSession", "127.0.0.1:1", "test", message); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java new file mode 100644 index 0000000000..624421f447 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; + +public class MessageStoreLogSubjectTest extends AbstractTestLogSubject +{ + VirtualHost _testVhost; + + public void setUp() throws Exception + { + super.setUp(); + + _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). + getVirtualHost("test"); + + _subject = new MessagesStoreLogSubject(_testVhost, _testVhost.getMessageStore()); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][vh(/test)/ms(MemoryMessageStore)] + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + verifyVirtualHost(message, _testVhost); + + String msSlice = getSlice("ms", message); + + assertNotNull("MessageStore not found:" + message, msSlice); + + assertEquals("MessageStore not correct", + _testVhost.getMessageStore().getClass().getSimpleName(), + msSlice); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java index 0b0b0d78d1..e217497b7b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java @@ -20,17 +20,13 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactory; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -57,31 +53,13 @@ public class SubscriptionLogSubjectTest extends AbstractTestLogSubject _queue = new MockAMQQueue("QueueLogSubjectTest"); ((MockAMQQueue) _queue).setVirtualHost(_testVhost); - // Create a single session for this test. - // Re-use is ok as we are testing the LogActor object is set correctly, - // not the value of the output. - AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); - // Use the first Virtualhost that has been defined to initialise - // the MockProtocolSession. This prevents a NPE when the - // AMQPActor attempts to lookup the name of the VHost. - try - { - session.setVirtualHost(ApplicationRegistry.getInstance(). - getVirtualHostRegistry().getVirtualHosts(). - toArray(new VirtualHost[1])[0]); - } - catch (AMQException e) - { - fail("Unable to set virtualhost on session:" + e.getMessage()); - } + AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore()); - AMQChannel channel = new AMQChannel(session, _channelID, session.getVirtualHost().getMessageStore()); - - session.addChannel(channel); + _session.addChannel(channel); SubscriptionFactory factory = new SubscriptionFactoryImpl(); - _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"), + _subscription = factory.createSubscription(_channelID, _session, new AMQShortString("cTag"), _acks, _filters, _noLocal, new LimitlessCreditManager()); @@ -102,13 +80,13 @@ public class SubscriptionLogSubjectTest extends AbstractTestLogSubject String subscriptionSlice = getSlice("sub:" + _subscription.getSubscriptionID(), message); - + assertNotNull("Unable to locate subscription 'sub:" + _subscription.getSubscriptionID() + "'"); // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only // Subject that nests () and so the simple parser of checking for the // next ')' falls down. - verifyQueue(subscriptionSlice+")", _queue); + verifyQueue(subscriptionSlice + ")", _queue); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index f09b03ab85..6050512679 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -111,8 +111,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _protocolSession = - new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), - null); + new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true)); // Need to authenticate session for it to work, (well for logging to work) _protocolSession.setAuthorizedID(new Principal() { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 49c5f8a14b..37dfead2e5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.HashMap; @@ -34,7 +34,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.net.SocketAddress; +import java.security.Principal; public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter { @@ -42,7 +42,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen final Map>> _channelDelivers; private AtomicInteger _deliveryCount = new AtomicInteger(0); - public InternalTestProtocolSession() throws AMQException + public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException { super(new TestIoSession(), ApplicationRegistry.getInstance().getVirtualHostRegistry(), @@ -50,6 +50,16 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen _channelDelivers = new HashMap>>(); + // Need to authenticate session for it to be representative testing. + setAuthorizedID(new Principal() + { + public String getName() + { + return "InternalTestProtocolSession"; + } + }); + + setVirtualHost(virtualHost); } public ProtocolOutputConverter getProtocolOutputConverter() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java index 9597c1319a..7e16737a83 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.AMQException; @@ -41,7 +42,10 @@ public class MaxChannelsTest extends TestCase public void testChannels() throws Exception { _session = new AMQMinaProtocolSession(new TestIoSession(), _appRegistry - .getVirtualHostRegistry(), new AMQCodecFactory(true), null); + .getVirtualHostRegistry(), new AMQCodecFactory(true)); + + // Set the current Actor for these tests + CurrentActor.set(_session.getLogActor()); // Need to authenticate session for it to work, (well for logging to work) _session.setAuthorizedID(new Principal() @@ -92,6 +96,11 @@ public class MaxChannelsTest extends TestCase // Yikes fail(e.getMessage()); } + finally + { + //Remove the actor set during the test + CurrentActor.remove(); + } ApplicationRegistry.remove(1); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 6589f46c7b..db7312202b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; @@ -49,6 +50,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; import java.util.Set; +import java.security.Principal; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase @@ -184,7 +186,6 @@ public class AMQQueueAlertTest extends TestCase */ public void testQueueDepthAlertWithSubscribers() throws Exception { - _protocolSession = new InternalTestProtocolSession(); AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); _protocolSession.addChannel(channel); @@ -295,12 +296,13 @@ public class AMQQueueAlertTest extends TestCase super.setUp(); IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(1); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _protocolSession = new InternalTestProtocolSession(); - + _protocolSession = new InternalTestProtocolSession(_virtualHost); + CurrentActor.set(_protocolSession.getLogActor()); } protected void tearDown() { + CurrentActor.remove(); ApplicationRegistry.remove(1); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 1138b465cd..1acf8a3c31 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -175,7 +175,8 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getActiveConsumerCount() == 0); - InternalTestProtocolSession protocolSession = new InternalTestProtocolSession(); + InternalTestProtocolSession protocolSession = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); protocolSession.addChannel(channel); @@ -372,7 +373,7 @@ public class AMQQueueMBeanTest extends TestCase null); _queueMBean = new AMQQueueMBean(_queue); - _protocolSession = new InternalTestProtocolSession(); + _protocolSession = new InternalTestProtocolSession(_virtualHost); } public void tearDown() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 9c2932c5e2..d360904dd7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -29,6 +29,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; @@ -55,7 +58,7 @@ public class AckTest extends TestCase private Subscription _subscription; - private MockProtocolSession _protocolSession; + private AMQProtocolSession _protocolSession; private TestMemoryMessageStore _messageStore; @@ -66,19 +69,21 @@ public class AckTest extends TestCase private AMQQueue _queue; private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); + private VirtualHost _virtualHost; protected void setUp() throws Exception { super.setUp(); ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); + _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); _messageStore = new TestMemoryMessageStore(); - _protocolSession = new MockProtocolSession(_messageStore); + _protocolSession = new InternalTestProtocolSession(_virtualHost); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, _virtualHost, null); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java deleted file mode 100644 index fe79c40bb9..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.transport.Sender; - -import javax.security.sasl.SaslServer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; -import java.security.Principal; -import java.net.SocketAddress; - -/** - * A protocol session that can be used for testing purposes. - */ -public class MockProtocolSession implements AMQProtocolSession -{ - private MessageStore _messageStore; - - private Map _channelMap = new HashMap(); - - private static final AtomicLong idGenerator = new AtomicLong(0); - - private final long _sessionID = idGenerator.getAndIncrement(); - private VirtualHost _virtualHost; - - public MockProtocolSession(MessageStore messageStore) - { - _messageStore = messageStore; - } - - public long getSessionID() - { - return _sessionID; - } - - public void dataBlockReceived(AMQDataBlock message) throws Exception - { - } - - public void writeFrame(AMQDataBlock frame) - { - } - - public AMQShortString getContextKey() - { - return null; - } - - public void setContextKey(AMQShortString contextKey) - { - } - - public AMQChannel getChannel(int channelId) - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Invalid channel id: " + channelId); - } - else - { - return channel; - } - } - - public void addChannel(AMQChannel channel) - { - if (channel == null) - { - throw new IllegalArgumentException("Channel must not be null"); - } - else - { - _channelMap.put(channel.getChannelId(), channel); - } - } - - public void closeChannel(int channelId) throws AMQException - { - } - - public void closeChannelOk(int channelId) - { - - } - - public boolean channelAwaitingClosure(int channelId) - { - return false; - } - - public void removeChannel(int channelId) - { - _channelMap.remove(channelId); - } - - public void initHeartbeats(int delay) - { - } - - public void closeSession() throws AMQException - { - } - - public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException - { - } - - public Object getKey() - { - return null; - } - - public String getLocalFQDN() - { - return null; - } - - public SaslServer getSaslServer() - { - return null; - } - - public void setSaslServer(SaslServer saslServer) - { - } - - public FieldTable getClientProperties() - { - return null; - } - - public void setClientProperties(FieldTable clientProperties) - { - } - - public Object getClientIdentifier() - { - return null; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - - public void addSessionCloseTask(Task task) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void removeSessionCloseTask(Task task) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public ProtocolOutputConverter getProtocolOutputConverter() - { - return ProtocolOutputConverterRegistry.getConverter(this); - } - - public void setAuthorizedID(Principal authorizedID) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public Principal getPrincipal() - { - return new Principal() - { - public String getName() - { - return "MockProtocolSessionUser"; - } - }; - - } - - public SocketAddress getRemoteAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public MethodRegistry getMethodRegistry() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void methodFrameReceived(int channelId, AMQMethodBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void contentBodyReceived(int channelId, ContentBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public MethodDispatcher getMethodDispatcher() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public ProtocolSessionIdentifier getSessionIdentifier() - { - return null; - } - - public byte getProtocolMajorVersion() - { - return getProtocolVersion().getMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return getProtocolVersion().getMinorVersion(); - } - - - public ProtocolVersion getProtocolVersion() - { - return ProtocolVersion.getLatestSupportedVersion(); //To change body of implemented methods use File | Settings | File Templates. - } - - - public VersionSpecificRegistry getRegistry() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setSender(Sender sender) - { - // FIXME AS TODO - - } - - public void init() - { - // TODO Auto-generated method stub - - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index 6c6835ccca..8262ca0e29 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -31,15 +31,15 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.configuration.SecurityConfiguration; -import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugins.MockPluginManager; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.MockProtocolSession; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; public class ACLManagerTest extends TestCase { @@ -64,8 +64,13 @@ public class ACLManagerTest extends TestCase _pluginManager = new MockPluginManager(""); _authzManager = new ACLManager(_conf, _pluginManager); - - _session = new MockProtocolSession(new TestableMemoryMessageStore()); + + + VirtualHost virtualHost = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts().iterator().next(); + + // Create a single session for this test. + _session = new InternalTestProtocolSession(virtualHost); } public void testACLManagerConfigurationPluginManager() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 4317a501ed..30b571ef63 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -352,7 +352,7 @@ public class MessageStoreTest extends TestCase messageInfo, new NonTransactionalContext(_virtualHost.getMessageStore(), new StoreContext(), null, null), - new InternalTestProtocolSession()); + new InternalTestProtocolSession(_virtualHost)); } catch (AMQException e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 585ed9a538..b706ee51d8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -32,6 +32,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.InternalTestProtocolSession; @@ -79,17 +80,8 @@ public class InternalBrokerBaseCase extends TestCase _queue.bind(defaultExchange, QUEUE_NAME, null); - _session = new InternalTestProtocolSession(); - - _session.setAuthorizedID(new Principal() - { - public String getName() - { - return "InternalBrokerBaseCaseUser"; - } - }); - - _session.setVirtualHost(_virtualHost); + _session = new InternalTestProtocolSession(_virtualHost); + CurrentActor.set(_session.getLogActor()); _channel = new MockChannel(_session, 1, _messageStore); @@ -98,6 +90,7 @@ public class InternalBrokerBaseCase extends TestCase public void tearDown() throws Exception { + CurrentActor.remove(); ApplicationRegistry.remove(1); super.tearDown(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 84bee7984b..4ecbffb4b4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -39,6 +39,8 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; import java.util.Collection; import java.util.HashMap; @@ -73,6 +75,9 @@ public class TestApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, + new Log4jMessageLogger()); + Properties users = new Properties(); users.put("guest", "guest"); diff --git a/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm b/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm index 917901a18b..10be2299e9 100644 --- a/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm +++ b/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm @@ -106,14 +106,26 @@ public class ${type.name}Messages public static LogMessage ${message.methodName}(#foreach($parameter in ${message.parameters})${parameter.type} ${parameter.name}#if (${velocityCount} != ${message.parameters.size()} ), #end #end#if(${message.parameters.size()} > 0 && ${message.options.size()} > 0), #end#foreach($option in ${message.options})boolean ${option.name}#if (${velocityCount} != ${message.options.size()} ), #end#end) { +## +## If we don't have any parameters then we don't need the overhead of using the +## message formatter so we can just set our return message to the retreived +## fixed string. +## So we don't need to update the _formatter with the new pattern. +## +## Here we setup rawMessage to be the formatted message ready for direct return +## with the message.name or further processing to remove options. +## +#if(${message.parameters.size()} > 0) final Object[] messageArguments = {#foreach($parameter in ${message.parameters})${parameter.name}#if (${velocityCount} != ${message.parameters.size()} ), #end#end}; _formatter.applyPattern(_messages.getString("${message.name}")); - - ## If we have some options then we need to build the message based - ## on those values so only provide that logic if we need it. -#if(${message.options.size()} > 0) String rawMessage = _formatter.format(messageArguments); +#else + String rawMessage = _messages.getString("${message.name}"); +#end +## If we have some options then we need to build the message based +## on those values so only provide that logic if we need it. +#if(${message.options.size()} > 0) StringBuffer msg =new StringBuffer("${message.name} : "); // Split the formatted message up on the option values so we can @@ -124,8 +136,8 @@ public class ${type.name}Messages int end; if (parts.length > 1) { - ## For Each Optional value check if it has been enabled and then - ## append it to the log. +## For Each Optional value check if it has been enabled and then +## append it to the log. #foreach($option in ${message.options}) // Add Option : ${option.value} @@ -142,8 +154,8 @@ public class ${type.name}Messages final String message = msg.toString(); #else - ## If we have no options then we can just format and set the log - final String message = "${message.name} : " + _formatter.format(messageArguments); +## If we have no options then we can just format and set the log + final String message = "${message.name} : " + rawMessage; #end return new LogMessage() diff --git a/qpid/java/common/templates/model/ProtocolVersionListClass.vm b/qpid/java/common/templates/model/ProtocolVersionListClass.vm index 9ac6adfdf5..d3e943e36f 100644 --- a/qpid/java/common/templates/model/ProtocolVersionListClass.vm +++ b/qpid/java/common/templates/model/ProtocolVersionListClass.vm @@ -41,12 +41,14 @@ public class ProtocolVersion implements Comparable { private final byte _majorVersion; private final byte _minorVersion; + private final String _stringFormat; public ProtocolVersion(byte majorVersion, byte minorVersion) { _majorVersion = majorVersion; _minorVersion = minorVersion; + _stringFormat = _majorVersion+"-"+_minorVersion; } public byte getMajorVersion() @@ -59,6 +61,11 @@ public class ProtocolVersion implements Comparable return _minorVersion; } + public String toString() + { + return _stringFormat; + } + public int compareTo(Object o) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java deleted file mode 100644 index 205a741b2f..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileReader; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.log4j.FileAppender; -import org.apache.log4j.Logger; -import org.apache.log4j.SimpleLayout; -import org.apache.qpid.server.store.DerbyMessageStore; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AlertingTest extends QpidTestCase -{ - private String VIRTUALHOST = "test"; - private Session _session; - private Connection _connection; - private Queue _destination; - private MessageConsumer _consumer; // Never read, but does need to be here to create the destination. - private File _logfile; - private XMLConfiguration _configuration; - private int _numMessages; - - public void setUp() throws Exception - { - // First we munge the config file and, if we're in a VM, set up an additional logfile - - _configuration = new XMLConfiguration(_configFile); - _configuration.setProperty("management.enabled", "false"); - Class storeClass = DerbyMessageStore.class; - Class bdb = null; - try { - bdb = Class.forName("org.apache.qpid.store.berkleydb.BDBMessageStore"); - } - catch (ClassNotFoundException e) - { - // No BDB store, we'll use Derby instead. - } - if (bdb != null) - { - storeClass = bdb; - } - - _configuration.setProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".store.class", storeClass.getName()); - _numMessages = 50; - - File tmpFile = File.createTempFile("configFile", "test"); - tmpFile.deleteOnExit(); - _configuration.save(tmpFile); - _configFile = tmpFile; - - - if (_outputFile != null) - { - _logfile = _outputFile; - } - else - { - // This is mostly for running the test outside of the ant setup - _logfile = File.createTempFile("logFile", "test"); - FileAppender appender = new FileAppender(new SimpleLayout(), _logfile.getAbsolutePath()); - appender.setFile(_logfile.getAbsolutePath()); - appender.setImmediateFlush(true); - Logger.getRootLogger().addAppender(appender); - //_logfile.deleteOnExit(); - } - - // Then we do the normal setup stuff like starting the broker, getting a connection etc. - - super.setUp(); - - _connection = getConnection(); - _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - _destination = _session.createQueue("testQueue"); - - // Consumer is only used to actually create the destination - _consumer = _session.createConsumer(_destination); - } - - /** - * Checks the log file for MESSAGE_COUNT_ALERT, fails() the test if it's not found and - * places the entire contents in the message to help debug cruise control failures. - * @throws Exception - */ - private void wasAlertFired() throws Exception - { - // Loop through alerts until we're done or 5 seconds have passed, - // just in case the logfile takes a while to flush. - BufferedReader reader = new BufferedReader(new FileReader(_logfile)); - boolean found = false; - long endtime = System.currentTimeMillis()+5000; - while (!found && System.currentTimeMillis() < endtime) - { - while (reader.ready()) - { - String line = reader.readLine(); - if (line.contains("MESSAGE_COUNT_ALERT")) - { - found = true; - } - } - } - if (!found) - { - StringBuffer message = new StringBuffer("Could not find alert in log file: "+_logfile.getAbsolutePath()); - message.append("\n"); - reader = new BufferedReader(new FileReader(_logfile)); - for (int i = 0; i < 79; i++) { message.append("-"); }; - message.append("\n"); - while (reader.ready()) { message.append(reader.readLine() + "\n");} - message.append("\n"); - for (int i = 0; i < 79; i++) { message.append("-"); }; - message.append("\n"); - fail(message.toString()); - } - } - - public void testAlertingReallyWorks() throws Exception - { - // Send 5 messages, make sure that the alert was fired properly. - sendMessage(_session, _destination, _numMessages + 1); - _session.commit(); - wasAlertFired(); - } - - public void testAlertingReallyWorksWithRestart() throws Exception - { - sendMessage(_session, _destination, _numMessages + 1); - _session.commit(); - stopBroker(); - (new FileOutputStream(_logfile)).getChannel().truncate(0); - startBroker(); - wasAlertFired(); - } - - public void testAlertingReallyWorksWithChanges() throws Exception - { - // send some messages and nuke the logs - sendMessage(_session, _destination, 2); - _session.commit(); - stopBroker(); - (new FileOutputStream(_logfile)).getChannel().truncate(0); - - // Change max message count to 5, start broker and make sure that that's triggered at the right time - _configuration.setProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".queues.maximumMessageCount", 5); - startBroker(); - _connection = getConnection(); - _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - // Trigger the new value - sendMessage(_session, _destination, 3); - _session.commit(); - wasAlertFired(); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java new file mode 100644 index 0000000000..47ae599d2b --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.LogMonitor; + +import java.io.IOException; +import java.util.List; +import java.util.HashMap; +import java.util.LinkedList; + +public class AbstractTestLogging extends QpidTestCase +{ + protected LogMonitor _monitor; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _monitor = new LogMonitor(_outputFile); + } + + /** + * assert that the requested log message has not occured + * + * @param log + * + * @throws IOException + */ + public void assertLoggingNotYetOccured(String log) throws IOException + { + // Ensure the alert has not occured yet + assertEquals("Message has already occured:" + log, 0, + _monitor.findMatches(log).size()); + } + + + protected void validateMessageID(String id, String log) + { + assertEquals("Incorrect message",id, getMessageID(log)); + } + + protected String getMessageID(String log) + { + String message = fromMessage(log); + + return message.substring(0, message.indexOf(" ")); + } + + /** + * Return the first channel id from the log string + * ' ch;X' if there is no channel id return -1. + * + * @param log the log string to search. + * + * @return channel id or -1 if no channel id exists. + */ + protected int getChannelID(String log) + { + int start = log.indexOf("ch:") + 3; + + // If we do a check for ] as the boundary we will get cases where log + // is presented with the bounding. If we don't match a ] then we can use + // the end of the string as the boundary. + int end = log.indexOf("]", start); + if (end == -1) + { + end = log.length(); + } + + try + { + return Integer.parseInt(log.substring(start, end)); + } + catch (Exception e) + { + return -1; + } + } + + protected String fromMessage(String log) + { + int startSubject = log.indexOf("]") + 1; + int start = log.indexOf("]", startSubject) + 1; + + // If we don't have a subject then the second indexOf will return 0 + // in which case we can use the end of the actor as the index. + if (start == 0) + { + start = startSubject; + } + + return log.substring(start).trim(); + } + + /** + * Extract the Subject from the Log Message. + * + * The subject is the second block inclosed in brackets '[ ]'. + * + * If there is no Subject or the second block of brackets '[ ]' cannot be + * identified then an empty String ("") is returned. + * + * The brackets '[ ]' are not included in the returned String. + * + * @param log The log message to process + * @return the Subject string or the empty string ("") if the subject can't be identified. + */ + protected String fromSubject(String log) + { + int start = log.indexOf("[") + 1; + // Take the second index + start = log.indexOf("[", start) + 1; + + // There may not be a subject so in that case return nothing. + if (start == 0) + { + return ""; + } + + int end = log.indexOf("]", start); + try + { + return log.substring(start, end); + } + catch (IndexOutOfBoundsException iobe) + { + return ""; + } + } + + /** + * Extract the actor segment from the log message. + * The Actor segment is the first section enclosed in '[ ]'. + * + * No analysis is performed to ensure that the first '[ ]' section of the + * given log is really an Actor segment. + * + * The brackets '[ ]' are not included in the returned String. + * + * @param log the Log Message + * @return the Actor segment or "" if unable to locate '[ ]' section + */ + protected String fromActor(String log) + { + int start = log.indexOf("[") + 1; + int end = log.indexOf("]", start); + try + { + return log.substring(start, end).trim(); + } + catch (IndexOutOfBoundsException iobe) + { + return ""; + } + } + + /** + * Given our log message extract the connection ID: + * + * The log string will contain the connectionID identified by 'con:' + * + * So extract the value shown here by X: + * + * 'con:X(' + * + * Extract the value between the ':' and '(' and process it as an Integer + * + * If we are unable to find the right index or process the substring as an + * Integer then return -1. + * + * @param log the log String to process + * @return the connection ID or -1. + */ + protected int extractConnectionID(String log) + { + int conIDStart = log.indexOf("con:") + 4; + int conIDEnd = log.indexOf("(", conIDStart); + try + { + return Integer.parseInt(log.substring(conIDStart, conIDEnd)); + } + catch (Exception e) + { + return -1; + } + } + + /** + * Extract the log entry from the raw log line which will contain other + * log4j formatting. + * + * This formatting may impead our testing process so extract the log message + * as we know it to be formatted. + * + * This starts with the string MESSAGE + * @param rawLog the raw log + * @return the log we are expecting to be printed without the log4j prefixes + */ + protected String getLog(String rawLog) + { + int start = rawLog.indexOf("MESSAGE"); + return rawLog.substring(start); + } + + /** + * Given a list of messages that have been pulled out of a log file + * Process the results splitting the log statements in to lists based on the + * actor's connection ID. + * + * So for each log entry extract the Connecition ID from the Actor of the log + * + * Then use that as a key to a HashMap storing the list of log messages for + * that connection. + * + * @param logMessages The list of mixed connection log messages + * @return Map indexed by connection id to a list of log messages just for that connection. + */ + protected HashMap> splitResultsOnConnectionID(List logMessages) + { + HashMap> connectionSplitList = new HashMap>(); + + for (String log : logMessages) + { + // Get the connectionID from the Actor in the Message Log. + int cID = extractConnectionID(fromActor(getLog(log))); + + List connectionData = connectionSplitList.get(cID); + + // Create the initial List if we don't have one already + if (connectionData == null) + { + connectionData = new LinkedList(); + connectionSplitList.put(cID, connectionData); + } + + // Store the log + connectionData.add(log); + } + + return connectionSplitList; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java new file mode 100644 index 0000000000..14eec8daff --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java @@ -0,0 +1,202 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.logging; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; +import org.apache.qpid.util.LogMonitor; + +import javax.jms.Connection; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; + +public class AlertingTest extends AbstractTestLogging +{ + private String VIRTUALHOST = "test"; + private Session _session; + private Connection _connection; + private Queue _destination; + private int _numMessages; + + private static final int ALERT_LOG_WAIT_PERIOD = 5000; + private static final String MESSAGE_COUNT_ALERT = "MESSAGE_COUNT_ALERT"; + + public void setUp() throws Exception + { + // set QPID_WORK to be [QPID_WORK|io.tmpdir]/ + // This ensures that each of these tests operate independantly. + setSystemProperty("QPID_WORK", + System.getProperty("QPID_WORK", + System.getProperty("java.io.tmpdir")) + + File.separator + getName()); + + // Update the configuration to make our virtualhost Persistent. + makeVirtualHostPersistent(VIRTUALHOST); + + _numMessages = 50; + + // Then we do the normal setup stuff like starting the broker, getting a connection etc. + super.setUp(); + + setupConnection(); + } + + /** + * Create a new connection and ensure taht our destination queue is created + * and bound. + * + * Note that the tests here that restart the broker rely on persistence. + * However, the queue creation here is transient. So the queue will not be + * rebound on restart. Hence the consumer creation here rather than just the + * once. + * + * The persistent messages will recreate the queue but not bind it (as it + * was not a durable queue) However, the consumer creation here will ensure + * that the queue is correctly bound and can receive new messages. + * + * @throws Exception + */ + private void setupConnection() + throws Exception + { + _connection = getConnection(); + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _destination = _session.createQueue("testQueue"); + + // Consumer is only used to actually create the destination + _session.createConsumer(_destination).close(); + } + + /** + * Checks the log file for MESSAGE_COUNT_ALERT, fails() the test if it's not found and + * places the entire contents in the message to help debug cruise control failures. + * + * @throws Exception + */ + private void wasAlertFired() throws Exception + { + if (!_monitor.waitForMessage(MESSAGE_COUNT_ALERT, ALERT_LOG_WAIT_PERIOD)) + { + StringBuffer message = new StringBuffer("Could not find 'MESSAGE_COUNT_ALERT' in log file: " + _monitor.getMonitoredFile().getAbsolutePath()); + message.append("\n"); + + // Add the current contents of the log file to test output + message.append(_monitor.readFile()); + + // Write the server config file to test output + message.append("Server configuration file in use:\n"); + message.append(FileUtils.readFileAsString(_configFile)); + + // Write the virtualhost config file to test output + message.append("\nVirtualhost configuration file in use:\n"); + message.append(FileUtils.readFileAsString(ServerConfiguration. + flatConfig(_configFile).getString("virtualhosts"))); + + fail(message.toString()); + } + } + + public void testAlertingReallyWorks() throws Exception + { + // Send 5 messages, make sure that the alert was fired properly. + sendMessage(_session, _destination, _numMessages + 1); + _session.commit(); + wasAlertFired(); + } + + public void testAlertingReallyWorksWithRestart() throws Exception + { + sendMessage(_session, _destination, _numMessages + 1); + _session.commit(); + stopBroker(); + + // Rest the monitoring clearing the current output file. + _monitor.reset(); + startBroker(); + wasAlertFired(); + } + + /** + * Test that if the alert value is change from the previous value we can + * still get alerts. + * + * Test sends two messages to the broker then restarts the broker with new + * configuration. + * + * If the test is running inVM the test validates that the new configuration + * has been applied. + * + * Validates that we only have two messages on the queue and then sends + * enough messages to trigger the alert. + * + * The alert is then validate. + * + * + * @throws Exception + */ + public void testAlertingReallyWorksWithChanges() throws Exception + { + // send some messages and nuke the logs + sendMessage(_session, _destination, 2); + _session.commit(); + // To prevent any failover/retry/connection dropped errors + _connection.close(); + + stopBroker(); + + _monitor.reset(); + + // Change max message count to 5, start broker and make sure that that's triggered at the right time + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".queues.maximumMessageCount", "5"); + + startBroker(); + + if (!isExternalBroker()) + { + assertEquals("Alert Max Msg Count is not correct", 5, ApplicationRegistry.getInstance().getVirtualHostRegistry(). + getVirtualHost(VIRTUALHOST).getQueueRegistry().getQueue(new AMQShortString(_destination.getQueueName())). + getMaximumMessageCount()); + } + + setupConnection(); + + // Validate the queue depth is as expected + long messageCount = ((AMQSession) _session).getQueueDepth((AMQDestination) _destination); + assertEquals("Broker has invalid message count for test", 2, messageCount); + + // Ensure the alert has not occured yet + assertLoggingNotYetOccured(MESSAGE_COUNT_ALERT); + + // Trigger the new value + sendMessage(_session, _destination, 3); + _session.commit(); + + // Validate that the alert occured. + wasAlertFired(); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java new file mode 100644 index 0000000000..2de6b08751 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; +import java.util.List; + +public class ChannelLoggingTest extends AbstractTestLogging +{ + private static final String CHANNEL_PREFIX = "CHN-"; + + public void setUp() throws Exception + { + // set QPID_WORK to be [QPID_WORK|io.tmpdir]/ + setSystemProperty("QPID_WORK", + System.getProperty("QPID_WORK", + System.getProperty("java.io.tmpdir")) + + File.separator + getName()); + + //Start the broker + super.setUp(); + } + + /** + * Description: + * When a new Channel (JMS Session) is created this will be logged as a CHN-1001 Create message. The messages will contain the prefetch details about this new Channel. + * Input: + * + * 1. Running Broker + * 2. New JMS Session/Channel creation + * + * Output: + * CHN-1001 : Create : Prefetch + * + * Validation Steps: + * 3. The CHN ID is correct + * 4. The prefetch value matches that defined by the requesting client. + * + * @throws Exception - if an error occurs + */ + public void testChannelCreate() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Test that calling session.close gives us the expected output + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + List results = _monitor.findMatches(CHANNEL_PREFIX); + + // Validation + + assertEquals("CHN messages not logged", 1, results.size()); + + String log = getLog(results.get(0)); + // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create + //1 & 2 + validateMessageID("CHN-1001", log); + assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log))); + + connection.close(); + } + + /** + * Description: + * The Java Broker implements consumer flow control for all ack modes except + * No-Ack. When a client connects the session's flow is initially set to + * Stopped. Verify this message appears + * + * Input: + * 1. Running broker + * 2. Create consumer + * Output: + * + * CHN-1002 : Flow Stopped + * + * Validation Steps: + * 4. The CHN ID is correct + * + * @throws Exception - if an error occurs + */ + + public void testChannelStartsFlowStopped() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session to fill up + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = (Queue) getInitialContext().lookup(QUEUE); + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + List results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last channel message should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow Off + + // Verify + int resultSize = results.size(); + String log = getLog(results.get(resultSize - 1)); + + validateMessageID("CHN-1002", log); + assertTrue("Message should be Flow Stopped", fromMessage(log).endsWith("Flow Stopped")); + + } + + /** + * Description: + * The Java Broker implements consumer flow control for all ack modes except + * No-Ack. When the client first attempts to receive a message then the Flow + * status of the Session is set to Started. + * + * Input: + * 1. Running broker + * 2. Create a consumer + * 3. Attempt to receive a message + * Output: + * + * CHN-1002 : Flow Started + * + * Validation Steps: + * 4. The CHN ID is correct + * + * @throws Exception - if an error occurs + */ + + public void testChannelStartConsumerFlowStarted() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session to fill up + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = (Queue) getInitialContext().lookup(QUEUE); + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + //Call receive to send the Flow On message + consumer.receiveNoWait(); + + List results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last two channel messages should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On + + // Verify + + int resultSize = results.size(); + String log = getLog(results.get(resultSize - 1)); + + validateMessageID("CHN-1002", log); + assertTrue("Message should be Flow Started", fromMessage(log).endsWith("Flow Started")); + + } + + /** + * Description: + * When the client gracefully closes the Connection then a CHN-1003 Close + * message will be issued. This must be the last message logged for this + * Channel. + * Input: + * 1. Running Broker + * 2. Connected Client + * 3. Client then requests that the Connection is closed + * Output: + * + * CHN-1003 : Close + * + * Validation Steps: + * 4. The MST ID is correct + * 5. This must be the last message logged for this Channel. + * + * @throws Exception - if an error occurs + */ + public void testChannelCloseViaConnectionClose() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Close the connection to verify the created session closing is logged. + connection.close(); + + List results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last two channel messages should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On + + // Verify + + int resultSize = results.size(); + String log = getLog(results.get(resultSize - 1)); + + validateMessageID("CHN-1003", log); + assertTrue("Message should be Close:" + fromMessage(log), fromMessage(log).endsWith("Close")); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log))); + } + + /** + * Description: + * When the client gracefully closes the Connection then a CHN-1003 Close + * message will be issued. This must be the last message logged for this + * Channel. + * Input: + * 1. Running Broker + * 2. Connected Client + * 3. Client then requests that the Channel is closed + * Output: + * + * CHN-1003 : Close + * + * Validation Steps: + * 4. The MST ID is correct + * 5. This must be the last message logged for this Channel. + * + * @throws Exception - if an error occurs + */ + public void testChannelCloseViaChannelClose() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session and then close it + connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close(); + + List results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last two channel messages should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On + + // Verify + + int resultSize = results.size(); + String log = getLog(results.get(resultSize - 1)); + + validateMessageID("CHN-1003", log); + assertTrue("Message should be Close:" + fromMessage(getLog(log)), fromMessage(log).endsWith("Close")); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log))); + } + +} \ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java new file mode 100644 index 0000000000..46f32b1414 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java @@ -0,0 +1,185 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.logging; + +import org.apache.qpid.test.unit.client.forwardall.Client; + +import javax.jms.Connection; +import java.io.File; +import java.util.List; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.SortedSet; +import java.util.Collections; +import java.util.TreeSet; + +public class ConnectionLoggingTest extends AbstractTestLogging +{ + private static final String CONNECTION_PREFIX = "CON-"; + + public void setUp() throws Exception + { + // set QPID_WORK to be [QPID_WORK|io.tmpdir]/ + setSystemProperty("QPID_WORK", + System.getProperty("QPID_WORK", + System.getProperty("java.io.tmpdir")) + + File.separator + getName()); + + //Start the broker + super.setUp(); + } + + /** + * Description: + * When a new connection is made to the broker this must be logged. + * + * Input: + * 1. Running Broker + * 2. Connecting client + * Output: + * CON-1001 : Open : Client ID {0}[ : Protocol Version : {1}] + * + * Validation Steps: + * 1. The CON ID is correct + * 2. This is the first CON message for that Connection + * + * @throws Exception - if an error occurs + */ + public void testConnectionOpen() throws Exception + { + assertLoggingNotYetOccured(CONNECTION_PREFIX); + + Connection connection = getConnection(); + + List results = _monitor.findMatches(CONNECTION_PREFIX); + + // Validation + // We should have at least three messages when running InVM but when running External + // we will get 0-10 negotiation on con:0 whcih may close at some random point + // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open + // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open : Protocol Version : 0-10 + // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open + // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Protocol Version : 0-9 + // MESSAGE [con:0(/127.0.0.1:46926)] CON-1002 : Close + // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 + + //So check how many connections we have in the result set and extract the last one. + // When running InVM we will have con:0 and externally con:1 + + HashMap> connectionData = splitResultsOnConnectionID(results); + + // Get the last Integer from keySet of the ConnectionData + int connectionID = new TreeSet(connectionData.keySet()).last(); + + //Use just the data from the last connection for the test + results = connectionData.get(connectionID); + + // If we are running inVM we will get three open messagse, if running externally weN will also have + // open and close messages from the failed 0-10 negotiation + assertTrue("CON messages not logged:" + results.size(), results.size() >= 3); + + String log = getLog(results.get(0)); + // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open + //1 & 2 + validateMessageID("CON-1001",log); + + //We get the size so that we can validate the last three CON- messages + int resultsSize = results.size(); + // This is because when running externally we will also have logged the failed + // 0-10 negotiation messages + + // 3 - Assert the options are correct + log = getLog(results.get(resultsSize - 1)); + // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 + validateMessageID("CON-1001",log); + assertTrue("Client ID option is not present", fromMessage(log).contains("Client ID :")); + assertTrue("Client ID value is not present", fromMessage(log).contains(connection.getClientID())); + + assertTrue("Protocol Version option is not present", fromMessage(log).contains("Protocol Version :")); + //fixme there is no way currently to find out the negotiated protocol version + // The delegate is the versioned class ((AMQConnection)connection)._delegate + + log = getLog(results.get(resultsSize - 2)); + // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Protocol Version : 0-9 + validateMessageID("CON-1001",log); + assertTrue("Protocol Version option is not present", fromMessage(log).contains("Protocol Version :")); + //fixme agani we should check the version + // Check that client ID is not present in log + assertTrue("Client ID option is present", !fromMessage(log).contains("Client ID :")); + + log = getLog(results.get(resultsSize - 3)); + validateMessageID("CON-1001",log); + // Check that PV is not present in log + assertTrue("Protocol Version option is present", !fromMessage(log).contains("Protocol Version :")); + // Check that client ID is not present in log + assertTrue("Client ID option is present", !fromMessage(log).contains("Client ID :")); + + connection.close(); + } + + /** + * Description: + * When a connected client closes the connection this will be logged as a CON-1002 message. + * Input: + * + * 1. Running Broker + * 2. Connected Client + * Output: + * + * CON-1002 : Close + * + * Validation Steps: + * 3. The CON ID is correct + * 4. This must be the last CON message for the Connection + * 5. It must be preceded by a CON-1001 for this Connection + */ + public void testConnectionClose() throws Exception + { + assertLoggingNotYetOccured(CONNECTION_PREFIX); + + // Open and then close the conneciton + getConnection().close(); + + List results = _monitor.findMatches(CONNECTION_PREFIX); + + // Validation + + // We should have at least four messages + assertTrue("CON messages not logged:" + results.size(), results.size() >= 4); + + //We get the size so that we can validate the last set of CON- messages + int resultsSize = results.size(); + + // Validate Close message occurs + String log = getLog(results.get(resultsSize - 1)); + validateMessageID("CON-1002",log); + assertTrue("Message does not end with close:" + log, log.endsWith("Close")); + + // Extract connection ID to validate there is a CON-1001 messasge for it + int connectionID = extractConnectionID(log); + + //Previous log message should be the open + log = getLog(results.get(resultsSize - 2)); + // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 + validateMessageID("CON-1001",log); + assertEquals("Connection IDs do not match", connectionID, extractConnectionID(fromActor(log))); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 91732bc010..c182db5d78 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -28,19 +28,31 @@ import javax.jms.Message; import javax.jms.JMSException; import javax.naming.InitialContext; import javax.naming.NamingException; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.net.MalformedURLException; import java.util.ArrayList; import java.util.List; -import java.util.StringTokenizer; import java.util.Map; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.net.MalformedURLException; + + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.store.DerbyMessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.jms.BrokerDetails; @@ -168,6 +180,7 @@ public class QpidTestCase extends TestCase // the connections created for a given test protected List _connections = new ArrayList(); + public static final String QUEUE = "queue"; public QpidTestCase(String name) { @@ -238,7 +251,8 @@ public class QpidTestCase extends TestCase public void run(TestResult testResult) { - if (_exclusionList != null && (_exclusionList.contains(getClass().getName() + "#*") || + if (_exclusionList != null && (_exclusionList.contains(getClass().getPackage().getName() + ".*") || + _exclusionList.contains(getClass().getName() + "#*") || _exclusionList.contains(getClass().getName() + "#" + getName()))) { _logger.info("Test: " + getName() + " is excluded"); @@ -490,6 +504,119 @@ public class QpidTestCase extends TestCase } } + + /** + * Attempt to set the Java Broker to use the BDBMessageStore for persistence + * Falling back to the DerbyMessageStore if + * + * @param virtualhost - The virtualhost to modify + * + * @throws ConfigurationException - when reading/writing existing configuration + * @throws IOException - When creating a temporary file. + */ + protected void makeVirtualHostPersistent(String virtualhost) + throws ConfigurationException, IOException + { + Class storeClass = DerbyMessageStore.class; + + Class bdb = null; + try + { + bdb = Class.forName("org.apache.qpid.store.berkleydb.BDBMessageStore"); + } + catch (ClassNotFoundException e) + { + // No BDB store, we'll use Derby instead. + } + + if (bdb != null) + { + storeClass = bdb; + } + + // First we munge the config file and, if we're in a VM, set up an additional logfile + XMLConfiguration configuration = new XMLConfiguration(_configFile); + configuration.setProperty("virtualhosts.virtualhost." + virtualhost + + ".store.class", storeClass.getName()); + configuration.setProperty("virtualhosts.virtualhost." + virtualhost + + ".store." + DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY, + "${work}"); + + File tmpFile = File.createTempFile("configFile", "test"); + tmpFile.deleteOnExit(); + configuration.save(tmpFile); + _configFile = tmpFile; + } + + /** + * Set a configuration Property for this test run. + * + * This creates a new configuration based on the current configuration + * with the specified property change. + * + * Multiple calls to this method will result in multiple temporary + * configuration files being created. + * + * @param property the configuration property to set + * @param value the new value + * @throws ConfigurationException when loading the current config file + * @throws IOException when writing the new config file + */ + protected void setConfigurationProperty(String property, String value) + throws ConfigurationException, IOException + { + XMLConfiguration configuration = new XMLConfiguration(_configFile); + + // If we are modifying a virtualhost value then we need to do so in + // the virtualhost.xml file as these values overwrite the values in + // the main config.xml file + if (property.startsWith("virtualhosts")) + { + // So locate the virtualhost.xml file and use the ServerConfiguration + // flatConfig method to get the interpolated value. + String vhostConfigFile = ServerConfiguration. + flatConfig(_configFile).getString("virtualhosts"); + + // Load the vhostConfigFile + XMLConfiguration vhostConfiguration = new XMLConfiguration(vhostConfigFile); + + // Set the value specified in to the vhostConfig. + // Remembering that property will be 'virtualhosts.virtulhost....' + // so we need to take off the 'virtualhosts.' from the start. + vhostConfiguration.setProperty(property.substring(property.indexOf(".") + 1), value); + + // Write out the new virtualhost config file + File tmpFile = File.createTempFile("virtualhost-configFile", ".xml"); + tmpFile.deleteOnExit(); + vhostConfiguration.save(tmpFile); + + // Change the property and value to be the new virtualhosts file + // so that then update the value in the main config file. + property = "virtualhosts"; + value = tmpFile.getAbsolutePath(); + } + + configuration.setProperty(property, value); + + // Write the new server config file + File tmpFile = File.createTempFile("configFile", ".xml"); + tmpFile.deleteOnExit(); + configuration.save(tmpFile); + + _logger.info("Qpid Test Case now using configuration File:" + + tmpFile.getAbsolutePath()); + + _configFile = tmpFile; + } + + /** + * Set a System property for the duration of this test. + * + * When the test run is complete the value will be reverted. + + * @param property the property to set + * @param value the new value to use + */ protected void setSystemProperty(String property, String value) { if (!_setProperties.containsKey(property)) @@ -531,6 +658,21 @@ public class QpidTestCase extends TestCase return _brokerVersion.equals(VERSION_010); } + protected boolean isJavaBroker() + { + return _brokerLanguage.equals("java"); + } + + protected boolean isCppBroker() + { + return _brokerLanguage.equals("cpp"); + } + + protected boolean isExternalBroker() + { + return !_broker.equals("vm"); + } + public void restartBroker() throws Exception { restartBroker(0); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java new file mode 100644 index 0000000000..84010453e1 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import org.apache.log4j.FileAppender; +import org.apache.log4j.Logger; +import org.apache.log4j.SimpleLayout; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.List; + +/** + * Utility to simplify the monitoring of Log4j file output + * + * Monitoring of a given log file can be done alternatively the Monitor will + * add a new log4j FileAppender to the root Logger to gather all the available + * logging for monitoring + */ +public class LogMonitor +{ + // The file that the log statements will be written to. + private File _logfile; + + /** + * Create a new LogMonitor that creates a new Log4j Appender and monitors + * all log4j output via the current configuration. + * + * @throws IOException if there is a problem creating the temporary file. + */ + public LogMonitor() throws IOException + { + this(null); + } + + /** + * Create a new LogMonitor on the specified file if the file does not exist + * or the value is null then a new Log4j appender will be added and + * monitoring set up on that appender. + * + * NOTE: for the appender to receive any value the RootLogger will need to + * have the level correctly configured.ng + * + * @param file the file to monitor + * + * @throws IOException if there is a problem creating a temporary file + */ + public LogMonitor(File file) throws IOException + { + if (file != null && file.exists()) + { + _logfile = file; + } + else + { + // This is mostly for running the test outside of the ant setup + _logfile = File.createTempFile("LogMonitor", ".log"); + FileAppender appender = new FileAppender(new SimpleLayout(), + _logfile.getAbsolutePath()); + appender.setFile(_logfile.getAbsolutePath()); + appender.setImmediateFlush(true); + Logger.getRootLogger().addAppender(appender); + } + } + + /** + * Checks the log for instances of the search string. + * + * The pattern parameter can take any valid argument used in String.contains() + * + * {@see String.contains(CharSequences)} + * + * @param pattern the search string + * + * @return a list of matching lines from the log + * + * @throws IOException if there is a problem with the file + */ + public List findMatches(String pattern) throws IOException + { + return FileUtils.searchFile(_logfile, pattern); + } + + /** + * Checks the log file for a given message to appear. + * + * @param message the message to wait for in the log + * @param wait the time in ms to wait for the message to occur + * + * @return true if the message was found + * + * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws IOException thrown when reading the log file + */ + public boolean waitForMessage(String message, long wait) + throws FileNotFoundException, IOException + { + // Loop through alerts until we're done or wait ms seconds have passed, + // just in case the logfile takes a while to flush. + BufferedReader reader = new BufferedReader(new FileReader(_logfile)); + boolean found = false; + long endtime = System.currentTimeMillis() + wait; + while (!found && System.currentTimeMillis() < endtime) + { + while (reader.ready()) + { + String line = reader.readLine(); + if (line.contains(message)) + { + found = true; + } + } + } + + return found; + } + + /** + * Read the log file in to memory as a String + * + * @return the current contents of the log file + * + * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws IOException thrown when reading the log file + */ + public String readFile() throws FileNotFoundException, IOException + { + return FileUtils.readFileAsString(_logfile); + } + + /** + * Return a File reference to the monitored file + * + * @return the file being monitored + */ + public File getMonitoredFile() + { + return _logfile; + } + + /** + * Clears the log file and writes: 'Log Monitor Reset' at the start of the file + * + * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws IOException thrown if there is a problem with the log file + */ + public void reset() throws FileNotFoundException, IOException + { + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(_logfile)); + writer.write("Log Monitor Reset\n"); + writer.close(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java new file mode 100644 index 0000000000..f4dade5660 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java @@ -0,0 +1,302 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +public class LogMonitorTest extends TestCase +{ + + /** + * Test that a new file is created when attempting to set up a monitor with + * the default constructor. + */ + public void testMonitor() + { + // Validate that a NPE is thrown with null input + try + { + LogMonitor montior = new LogMonitor(); + //Validte that the monitor is now running on a new file + assertTrue("New file does not have correct name:" + montior. + getMonitoredFile().getName(), + montior.getMonitoredFile().getName().contains("LogMonitor")); + } + catch (IOException ioe) + { + fail("IOE thrown:" + ioe); + } + } + + /** + * Test that creation of a monitor on an existing file is possible + * + * This also tests taht getMonitoredFile works + * + * @throws IOException if there is a problem creating the temporary file + */ + public void testMonitorNormalFile() throws IOException + { + File testFile = File.createTempFile("testMonitorFile", ".log"); + testFile.deleteOnExit(); + + LogMonitor monitor; + + //Ensure that we can create a monitor on a file + try + { + monitor = new LogMonitor(testFile); + assertEquals(testFile, monitor.getMonitoredFile()); + } + catch (IOException ioe) + { + fail("IOE thrown:" + ioe); + } + + } + + /** + * Test that a new file is created when attempting to set up a monitor on + * a null input value. + */ + public void testMonitorNullFile() + { + // Validate that a NPE is thrown with null input + try + { + LogMonitor montior = new LogMonitor(null); + //Validte that the monitor is now running on a new file + assertTrue("New file does not have correct name:" + montior. + getMonitoredFile().getName(), + montior.getMonitoredFile().getName().contains("LogMonitor")); + } + catch (IOException ioe) + { + fail("IOE thrown:" + ioe); + } + } + + /** + * Test that a new file is created when attempting to set up a monitor on + * a non existing file. + * + * @throws IOException if there is a problem setting up the nonexistent file + */ + public void testMonitorNonExistentFile() throws IOException + { + //Validate that we get a FileNotFound if the file does not exist + + File nonexist = File.createTempFile("nonexist", ".out"); + + assertTrue("Unable to delete file for our test", nonexist.delete()); + + assertFalse("Unable to test as our test file exists.", nonexist.exists()); + + try + { + LogMonitor montior = new LogMonitor(nonexist); + //Validte that the monitor is now running on a new file + assertTrue("New file does not have correct name:" + montior. + getMonitoredFile().getName(), + montior.getMonitoredFile().getName().contains("LogMonitor")); + } + catch (IOException ioe) + { + fail("IOE thrown:" + ioe); + } + } + + /** + * Test that Log file matches logged messages. + * + * @throws java.io.IOException if there is a problem creating LogMontior + */ + public void testFindMatches_Match() throws IOException + { + LogMonitor monitor = new LogMonitor(); + + String message = getName() + ": Test Message"; + + Logger.getRootLogger().warn(message); + + validateLogContainsMessage(monitor, message); + } + + /** + * Test that Log file does not match a message not logged. + * + * @throws java.io.IOException if there is a problem creating LogMontior + */ + public void testFindMatches_NoMatch() throws IOException + { + LogMonitor monitor = new LogMonitor(); + + String message = getName() + ": Test Message"; + + Logger.getRootLogger().warn(message); + + String notLogged = "This text was not logged"; + + validateLogDoesNotContainsMessage(monitor, notLogged); + } + + public void testWaitForMessage_Found() throws IOException + { + LogMonitor monitor = new LogMonitor(); + + String message = getName() + ": Test Message"; + + long TIME_OUT = 2000; + + logMessageWithDelay(message, TIME_OUT / 2); + + assertTrue("Message was not logged ", + monitor.waitForMessage(message, TIME_OUT)); + } + + public void testWaitForMessage_Timeout() throws IOException + { + LogMonitor monitor = new LogMonitor(); + + String message = getName() + ": Test Message"; + + long TIME_OUT = 2000; + + logMessageWithDelay(message, TIME_OUT); + + // Verify that we can time out waiting for a message + assertFalse("Message was logged ", + monitor.waitForMessage(message, TIME_OUT / 2)); + + // Verify that the message did eventually get logged. + assertTrue("Message was never logged.", + monitor.waitForMessage(message, TIME_OUT)); + } + + public void testReset() throws IOException + { + LogMonitor monitor = new LogMonitor(); + + String message = getName() + ": Test Message"; + + Logger.getRootLogger().warn(message); + + validateLogContainsMessage(monitor, message); + + String LOG_RESET_TEXT = "Log Monitor Reset"; + + validateLogDoesNotContainsMessage(monitor, LOG_RESET_TEXT); + + monitor.reset(); + + validateLogContainsMessage(monitor, LOG_RESET_TEXT); + + assertEquals(LOG_RESET_TEXT + "\n", monitor.readFile()); + } + + public void testRead() throws IOException + { + LogMonitor monitor = new LogMonitor(); + + String message = getName() + ": Test Message"; + + Logger.getRootLogger().warn(message); + + String fileContents = monitor.readFile(); + + assertTrue("Logged message not found when reading file.", + fileContents.contains(message)); + } + + /****************** Helpers ******************/ + + /** + * Validate that the LogMonitor does not match the given string in the log + * + * @param log The LogMonitor to check + * @param message The message to check for + * + * @throws IOException if a problems occurs + */ + protected void validateLogDoesNotContainsMessage(LogMonitor log, String message) + throws IOException + { + List results = log.findMatches(message); + + assertNotNull("Null results returned.", results); + + assertEquals("Incorrect result set size", 0, results.size()); + } + + /** + * Validate that the LogMonitor can match the given string in the log + * + * @param log The LogMonitor to check + * @param message The message to check for + * + * @throws IOException if a problems occurs + */ + protected void validateLogContainsMessage(LogMonitor log, String message) + throws IOException + { + List results = log.findMatches(message); + + assertNotNull("Null results returned.", results); + + assertEquals("Incorrect result set size", 1, results.size()); + + assertTrue("Logged Message'" + message + "' not present in results:" + + results.get(0), results.get(0).contains(message)); + } + + /** + * Create a new thread to log the given message after the set delay + * + * @param message the messasge to log + * @param delay the delay (ms) to wait before logging + */ + private void logMessageWithDelay(final String message, final long delay) + { + new Thread(new Runnable() + { + + public void run() + { + try + { + Thread.sleep(delay); + } + catch (InterruptedException e) + { + //ignore + } + + Logger.getRootLogger().warn(message); + } + }).start(); + } + +} diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes index 69077a97c8..643a26bb9d 100644 --- a/qpid/java/test-profiles/010Excludes +++ b/qpid/java/test-profiles/010Excludes @@ -77,3 +77,5 @@ org.apache.qpid.server.store.PersistentStoreTest#* // QPID-1225 : Temporary remove this test until the problem has been addressed org.apache.qpid.server.security.acl.SimpleACLTest#testClientPublishInvalidQueueSuccess +// CPP Broker does not follow the same Logging convention as the Java broker +org.apache.qpid.server.logging.* -- cgit v1.2.1