summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-05 10:47:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-05 10:47:52 +0000
commit6eb1a4b5693bbbc75319e19bc4686aa21f2bc5ec (patch)
tree88e5c7f734ededf30cc2c3564002f65e3e75bda3
parentf7e01a65f292af81813cbfc7f7bb60ce70139074 (diff)
downloadqpid-python-6eb1a4b5693bbbc75319e19bc4686aa21f2bc5ec.tar.gz
Merged from trunk up to r800440
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821749 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp13
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h16
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h8
-rw-r--r--qpid/cpp/src/qpid/broker/DtxTimeout.h6
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.h12
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h7
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp21
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h9
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.cpp80
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.h9
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h9
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp6
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h6
-rw-r--r--qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp1
-rw-r--r--qpid/cpp/src/qpid/replication/ReplicationExchange.cpp32
-rw-r--r--qpid/cpp/src/qpid/replication/constants.h1
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.cpp26
-rw-r--r--qpid/cpp/src/tests/Makefile.am2
-rw-r--r--qpid/cpp/src/tests/PartialFailure.cpp27
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp37
-rw-r--r--qpid/cpp/src/tests/allSegmentTypes.h128
-rwxr-xr-xqpid/cpp/src/tests/replication_test63
-rw-r--r--qpid/cpp/xml/cluster.xml4
-rw-r--r--qpid/gentools/templ.java/model/ProtocolVersionListClass.vm6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java96
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java41
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java236
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java113
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java34
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java31
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java66
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java24
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java59
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java16
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java286
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java5
-rw-r--r--qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm28
-rw-r--r--qpid/java/common/templates/model/ProtocolVersionListClass.vm7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java182
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java262
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java202
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java281
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java185
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java150
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java176
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java302
-rw-r--r--qpid/java/test-profiles/010Excludes2
92 files changed, 2632 insertions, 1235 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index c1de5e2dec..96d5146c30 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -31,7 +31,8 @@ namespace amqp_0_10 {
using sys::Mutex;
Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+ : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+ isClient(_isClient), buffered(0), version(0,10)
{}
void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
@@ -44,7 +45,9 @@ size_t Connection::decode(const char* buffer, size_t size) {
//read in protocol header
framing::ProtocolInitiation pi;
if (pi.decode(in)) {
- //TODO: check the version is correct
+ if(!(pi==version))
+ throw Exception(QPID_MSG("Unsupported version: " << pi
+ << " supported version " << version));
QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
}
initialized = true;
@@ -128,7 +131,11 @@ void Connection::send(framing::AMQFrame& f) {
}
framing::ProtocolVersion Connection::getVersion() const {
- return framing::ProtocolVersion(0,10);
+ return version;
+}
+
+void Connection::setVersion(const framing::ProtocolVersion& v) {
+ version = v;
}
size_t Connection::getBuffered() const {
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index 32aadff105..6fd51381fc 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -55,6 +55,7 @@ class Connection : public sys::ConnectionCodec,
bool initialized;
bool isClient;
size_t buffered;
+ framing::ProtocolVersion version;
public:
QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient);
@@ -71,6 +72,10 @@ class Connection : public sys::ConnectionCodec,
void send(framing::AMQFrame&);
framing::ProtocolVersion getVersion() const;
size_t getBuffered() const;
+
+ /** Used by cluster code to set a special version on "update" connections. */
+ // FIXME aconway 2009-07-30: find a cleaner mechanism for this.
+ void setVersion(const framing::ProtocolVersion&);
};
}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index ac8f835398..0517ceca95 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -36,7 +36,6 @@
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
-#include "qpid/broker/Timer.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
@@ -49,6 +48,7 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Timer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
@@ -115,13 +115,14 @@ public:
private:
std::string getHome();
};
-
+
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
void declareStandardExchange(const std::string& name, const std::string& type);
boost::shared_ptr<sys::Poller> poller;
+ sys::Timer timer;
Options config;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
@@ -132,7 +133,6 @@ public:
ExchangeRegistry exchanges;
LinkRegistry links;
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
- Timer timer;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -148,8 +148,6 @@ public:
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
public:
-
-
virtual ~Broker();
QPID_BROKER_EXTERN Broker(const Options& configuration);
@@ -188,7 +186,7 @@ public:
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
-
+
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
@@ -197,7 +195,7 @@ public:
management::Manageable::status_t ManagementMethod (uint32_t methodId,
management::Args& args,
std::string& text);
-
+
/** Add to the broker's protocolFactorys */
void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
@@ -229,7 +227,7 @@ public:
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
- Timer& getTimer() { return timer; }
+ sys::Timer& getTimer() { return timer; }
boost::function<std::vector<Url> ()> getKnownBrokers;
@@ -242,7 +240,5 @@ public:
};
}}
-
-
#endif /*!_Broker_*/
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 91c95289ac..a1a3c6ada7 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -49,35 +49,25 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
-struct ConnectionTimeoutTask : public TimerTask {
- Timer& timer;
+struct ConnectionTimeoutTask : public sys::TimerTask {
+ sys::Timer& timer;
Connection& connection;
- AbsTime expires;
- ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) :
TimerTask(Duration(hb*2*TIME_SEC)),
timer(t),
- connection(c),
- expires(AbsTime::now(), duration)
+ connection(c)
{}
- void touch()
- {
- expires = AbsTime(AbsTime::now(), duration);
+ void touch() {
+ restart();
}
void fire() {
- // This is the best we can currently do to avoid a destruction/fire race
- if (isCancelled()) return;
- if (expires < AbsTime::now()) {
- // If we get here then we've not received any traffic in the timeout period
- // Schedule closing the connection for the io thread
- QPID_LOG(error, "Connection timed out: closing");
- connection.abort();
- } else {
- reset();
- timer.add(this);
- }
+ // If we get here then we've not received any traffic in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
}
};
@@ -338,25 +328,22 @@ void Connection::setSecureConnection(SecureConnection* s)
adapter.setSecureConnection(s);
}
-struct ConnectionHeartbeatTask : public TimerTask {
- Timer& timer;
+struct ConnectionHeartbeatTask : public sys::TimerTask {
+ sys::Timer& timer;
Connection& connection;
- ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) :
TimerTask(Duration(hb*TIME_SEC)),
timer(t),
connection(c)
{}
void fire() {
- // This is the best we can currently do to avoid a destruction/fire race
- if (!isCancelled()) {
- // Setup next firing
- reset();
- timer.add(this);
-
- // Send Heartbeat
- connection.sendHeartbeat();
- }
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+
+ // Send Heartbeat
+ connection.sendHeartbeat();
}
};
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 731d36d83b..42409969b9 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -152,8 +152,8 @@ class Connection : public sys::ConnectionInputHandler,
qmf::org::apache::qpid::broker::Connection* mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
- Timer& timer;
- boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ sys::Timer& timer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
bool shadow;
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp
index 9eb077716d..a2ab20ec44 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/DtxTimeout.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Timer.h"
#include "qpid/ptr_map.h"
#include <boost/format.hpp>
@@ -33,7 +34,7 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
DtxManager::~DtxManager() {}
@@ -130,8 +131,7 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs)
}
timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
record->setTimeout(timeout);
- timer.add(boost::static_pointer_cast<TimerTask>(timeout));
-
+ timer.add(timeout);
}
uint32_t DtxManager::getTimeout(const std::string& xid)
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index 4b4fde3b39..680b62eeb2 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -24,9 +24,9 @@
#include <boost/ptr_container/ptr_map.hpp>
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxWorkRecord.h"
-#include "qpid/broker/Timer.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
namespace qpid {
@@ -35,7 +35,7 @@ namespace broker {
class DtxManager{
typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
- struct DtxCleanup : public TimerTask
+ struct DtxCleanup : public sys::TimerTask
{
DtxManager& mgr;
const std::string& xid;
@@ -47,14 +47,14 @@ class DtxManager{
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- Timer& timer;
+ qpid::sys::Timer& timer;
void remove(const std::string& xid);
DtxWorkRecord* getWork(const std::string& xid);
DtxWorkRecord* createWork(std::string xid);
public:
- DtxManager(Timer&);
+ DtxManager(qpid::sys::Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.h b/qpid/cpp/src/qpid/broker/DtxTimeout.h
index 3c468b3373..680a210e4f 100644
--- a/qpid/cpp/src/qpid/broker/DtxTimeout.h
+++ b/qpid/cpp/src/qpid/broker/DtxTimeout.h
@@ -22,7 +22,7 @@
#define _DtxTimeout_
#include "qpid/Exception.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
namespace qpid {
namespace broker {
@@ -31,12 +31,12 @@ class DtxManager;
struct DtxTimeoutException : public Exception {};
-struct DtxTimeout : public TimerTask
+struct DtxTimeout : public sys::TimerTask
{
const uint32_t timeout;
DtxManager& mgr;
const std::string xid;
-
+
DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 3672f71515..c70392eb23 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -41,18 +41,18 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// factored: The persistence element and maintenance element
// should be factored separately
LinkRegistry::LinkRegistry () :
- broker(0),
+ broker(0), timer(0),
parent(0), store(0), passive(false), passiveChanged(false),
realm("")
{
}
LinkRegistry::LinkRegistry (Broker* _broker) :
- broker(_broker),
- parent(0), store(0), passive(false), passiveChanged(false),
+ broker(_broker), timer(&broker->getTimer()),
+ parent(0), store(0), passive(false), passiveChanged(false),
realm(broker->getOptions().realm)
{
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+ timer->add (new Periodic(*this));
}
LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
@@ -61,7 +61,7 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
void LinkRegistry::Periodic::fire ()
{
links.periodicMaintenance ();
- links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+ links.timer->add (new Periodic(links));
}
void LinkRegistry::periodicMaintenance ()
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 8d1a252f54..d1a4201c82 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -25,9 +25,9 @@
#include <map>
#include "qpid/broker/Bridge.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/Timer.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
@@ -41,7 +41,7 @@ namespace broker {
// Declare a timer task to manage the establishment of link connections and the
// re-establishment of lost link connections.
- struct Periodic : public TimerTask
+ struct Periodic : public sys::TimerTask
{
LinkRegistry& links;
@@ -62,7 +62,7 @@ namespace broker {
qpid::sys::Mutex lock;
Broker* broker;
- Timer timer;
+ sys::Timer* timer;
management::Manageable* parent;
MessageStore* store;
bool passive;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index ee5831fed0..08ee133981 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -677,21 +677,25 @@ void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
+ try {
+ for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
+ if (lastValueQueue) checkLvqReplace(*i);
+ // don't force a message twice to disk.
+ if(!i->payload->isStoredOnQueue(shared_from_this())) {
+ i->payload->forcePersistent();
+ if (i->payload->isForcedPersistent() ){
+ enqueue(0, i->payload);
+ }
}
- }
- }
+ }
+ } catch (const std::exception& e) {
+ // Could not go into last node standing (for example journal not large enough)
+ QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
+ }
inLastNodeFailure = true;
}
}
-
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
@@ -1012,6 +1016,10 @@ void Queue::setPosition(SequenceNumber n) {
sequence = n;
}
+SequenceNumber Queue::getPosition() {
+ return sequence;
+}
+
int Queue::getEventMode() { return eventMode; }
void Queue::setQueueEventManager(QueueEvents& mgr)
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 475766ae30..6703d06bbb 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -324,6 +324,9 @@ namespace qpid {
* Used by cluster to replicate queues.
*/
void setPosition(framing::SequenceNumber pos);
+ /** return current position sequence number for the next message on the queue.
+ */
+ framing::SequenceNumber getPosition();
int getEventMode();
void setQueueEventManager(QueueEvents&);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
index 7a75ed013b..83f3f83520 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -26,15 +26,15 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
void QueueCleaner::start(qpid::sys::Duration p)
{
- task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
+ task = new Task(*this, p);
timer.add(task);
}
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {}
void QueueCleaner::Task::fire()
{
@@ -44,7 +44,7 @@ void QueueCleaner::Task::fire()
void QueueCleaner::fired()
{
queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
- task->reset();
+ task->setupNextFire();
timer.add(task);
}
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h
index 1ae72c19f5..c351cadd8a 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.h
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h
@@ -23,7 +23,7 @@
*/
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
namespace qpid {
namespace broker {
@@ -35,10 +35,10 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
private:
- class Task : public TimerTask
+ class Task : public sys::TimerTask
{
public:
Task(QueueCleaner& parent, qpid::sys::Duration duration);
@@ -46,10 +46,10 @@ class QueueCleaner
private:
QueueCleaner& parent;
};
-
- boost::intrusive_ptr<TimerTask> task;
+
+ boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- Timer& timer;
+ sys::Timer& timer;
void fired();
};
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 1c8e3ffebc..d4e5cfaa67 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -25,7 +25,7 @@
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -49,6 +49,7 @@ using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::AbsTime;
+//using qpid::sys::Timer;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
@@ -206,10 +207,10 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
}
}
-struct ScheduledCreditTask : public TimerTask {
- Timer& timer;
+struct ScheduledCreditTask : public sys::TimerTask {
+ sys::Timer& timer;
SessionState& sessionState;
- ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+ ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
SessionState& s) :
TimerTask(d),
timer(t),
@@ -218,15 +219,13 @@ struct ScheduledCreditTask : public TimerTask {
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
- if (!isCancelled()) {
- sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
- }
+ sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
}
void sendCredit() {
if ( !sessionState.processSendCredit(0) ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- reset();
+ setupNextFire();
timer.add(this);
}
}
@@ -269,7 +268,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
if ( !processSendCredit(1) ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
- Timer& timer = getBroker().getTimer();
+ sys::Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
flowControlTimer = new ScheduledCreditTask(d, timer, *this);
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index d66c449b06..67fd4f4f38 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -48,6 +48,10 @@ namespace framing {
class AMQP_ClientProxy;
}
+namespace sys {
+class TimerTask;
+}
+
namespace broker {
class Broker;
@@ -56,7 +60,6 @@ class Message;
class SessionHandler;
class SessionManager;
class RateFlowcontrol;
-struct TimerTask;
/**
* Broker-side session state includes session's handler chains, which
@@ -153,7 +156,7 @@ class SessionState : public qpid::SessionState,
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
- boost::intrusive_ptr<TimerTask> flowControlTimer;
+ boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
friend class SessionManager;
};
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 53e8bc5239..f69032b26d 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -360,8 +360,11 @@ size_t TCPConnector::decode(const char* buffer, size_t size)
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
- //TODO: check the version is correct
QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ if(!(protocolInit==version)){
+ throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+ << " supported version " << version));
+ }
}
initiated = true;
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index c5ac2ecfdc..ca3a7fa257 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -66,7 +66,7 @@
*
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
- * - Cluster Events: 0 connection number, are not associated with a connectin.
+ * - Cluster Events: 0 connection number, are not associated with a connection.
*
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
@@ -149,7 +149,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* sensible reporting of an attempt to mix different versions in a
* cluster.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1;
+const uint32_t Cluster::CLUSTER_VERSION = 2;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -163,7 +163,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
- void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
+ void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
void shutdown() { cluster.shutdown(member, l); }
@@ -869,15 +869,12 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) {
- // If we handle an errorCheck at this point (rather than in the
- // ErrorCheck class) then we have processed succesfully past the
- // point of the error.
- if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
- QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
- mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
- }
+void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
+ // If we see an errorCheck here (rather than in the ErrorCheck
+ // class) then we have processed succesfully past the point of the
+ // error.
+ if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened
+ error.respondNone(from, type, frameSeq);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 87280f682b..a95f2ab60e 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -152,7 +152,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
- void errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&);
+ void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void shutdown(const MemberId&, Lock&);
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index e4e0e50fe3..ca5237d6b1 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -71,7 +71,7 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : fra
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
: frameSeq(frameSeq_)
{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 1b891f73f0..5735a6335d 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -25,6 +25,7 @@
#include "qpid/cluster/types.h"
#include "qpid/Url.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
#include <boost/optional.hpp>
@@ -53,7 +54,7 @@ class ClusterMap {
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
/** Update from config change.
*@return true if member set changed.
@@ -92,8 +93,8 @@ class ClusterMap {
*/
static Set intersection(const Set& a, const Set& b);
- uint64_t getFrameSeq() { return frameSeq; }
- uint64_t incrementFrameSeq() { return ++frameSeq; }
+ framing::SequenceNumber getFrameSeq() { return frameSeq; }
+ framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
/** Clear out all knowledge of joiners & members, just keep alive set */
void clearStatus() { joiners.clear(); members.clear(); }
@@ -103,7 +104,7 @@ class ClusterMap {
Map joiners, members;
Set alive;
- uint64_t frameSeq;
+ framing::SequenceNumber frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 03a06e1c09..4cc977d14a 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -311,7 +311,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
output.setSendMax(sendMax);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
consumerNumbering.clear();
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index e15c23ccf2..2799cc9fe1 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -122,7 +122,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq);
void retractOffer();
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 0c791cdf44..4ff8b0a4a3 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -38,24 +38,27 @@ using namespace framing;
sys::ConnectionCodec*
ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
if (v == ProtocolVersion(0, 10))
- return new ConnectionCodec(out, id, cluster, false, false);
- else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
- return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection
+ return new ConnectionCodec(v, out, id, cluster, false, false);
+ else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
+ return new ConnectionCodec(v, out, id, cluster, true, false);
return 0;
}
// Used for outgoing Link connections
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
- return new ConnectionCodec(out, logId, cluster, false, true);
+ return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
- : codec(out, logId, isLink),
- interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
+ConnectionCodec::ConnectionCodec(
+ const ProtocolVersion& v, sys::OutputControl& out,
+ const std::string& logId, Cluster& cluster, bool catchUp, bool isLink
+) : codec(out, logId, isLink),
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
+ codec.setVersion(v);
}
ConnectionCodec::~ConnectionCodec() {}
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
index ea01b7abb9..4ff738b603 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -56,7 +56,8 @@ class ConnectionCodec : public sys::ConnectionCodec {
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
+ ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
+ const std::string& logId, Cluster& c, bool catchUp, bool isLink);
~ConnectionCodec();
// ConnectionCodec functions.
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
index 0a16d492e4..35be055d06 100644
--- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -45,11 +45,11 @@ ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
}
void ErrorCheck::error(
- Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
+ Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
- assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state.
type = t;
unresolved = ms;
frameSeq = seq;
@@ -59,7 +59,7 @@ void ErrorCheck::error(
<< " error " << frameSeq << " on " << c << ": " << msg
<< " must be resolved with: " << unresolved);
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
// If there are already frames queued up by a previous error, review
// them with respect to this new error.
for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
@@ -74,41 +74,52 @@ void ErrorCheck::delivered(const EventFrame& e) {
// Review a frame in the queue with respect to the current error.
ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
FrameQueue::iterator next = i+1;
- if (isUnresolved()) {
- const ClusterErrorCheckBody* errorCheck = 0;
- if (i->frame.getBody())
- errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- i->frame.getMethod());
- if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod())
+ return next; // Only interested in control frames while unresolved.
+ const AMQMethodBody* method = i->frame.getMethod();
+ if (method->isA<const ClusterErrorCheckBody>()) {
+ const ClusterErrorCheckBody* errorCheck =
+ static_cast<const ClusterErrorCheckBody*>(method);
+
+ if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
<< " did not occur on " << i->getMemberId());
- throw Exception("Aborted by failure that did not occur on all replicas");
+ throw Exception(QPID_MSG("Error " << frameSeq
+ << " did not occur on all members"));
}
else { // his error is worse/same as mine.
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
}
}
- else {
- const ClusterConfigChangeBody* configChange = 0;
- if (i->frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(
- i->frame.getMethod());
- if (configChange) {
- MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- QPID_LOG(debug, cluster << " apply config change to unresolved: "
- << members);
- MemberSet intersect;
- set_intersection(members.begin(), members.end(),
- unresolved.begin(), unresolved.end(),
- inserter(intersect, intersect.begin()));
- unresolved.swap(intersect);
- checkResolved();
- }
+ else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE
+ && i->connectionId.getMember() != cluster.getId())
+ {
+ // This error occured before the current error so we
+ // have processed past it.
+ next = frames.erase(i); // Drop the error check control
+ respondNone(i->connectionId.getMember(), errorCheck->getType(),
+ errorCheck->getFrameSeq());
+ }
+ // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue.
+ }
+ else if (method->isA<const ClusterConfigChangeBody>()) {
+ const ClusterConfigChangeBody* configChange =
+ static_cast<const ClusterConfigChangeBody*>(method);
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ QPID_LOG(debug, cluster << " apply config change to error "
+ << frameSeq << ": " << members);
+ MemberSet intersect;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
+ checkResolved();
}
}
return next;
@@ -117,10 +128,10 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
+ QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " must be resolved with " << unresolved);
}
@@ -131,4 +142,15 @@ EventFrame ErrorCheck::getNext() {
return e;
}
+void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) {
+ // Don't respond to non-errors or to my own errors.
+ if (type == ERROR_TYPE_NONE || from == cluster.getId())
+ return;
+ QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
+ cluster.getId()
+ );
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
index 550cd36b75..09028391ac 100644
--- a/qpid/cpp/src/qpid/cluster/ErrorCheck.h
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
@@ -25,6 +25,7 @@
#include "qpid/cluster/types.h"
#include "qpid/cluster/Multicaster.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
#include <deque>
#include <set>
@@ -49,11 +50,12 @@ class ErrorCheck
public:
typedef std::set<MemberId> MemberSet;
typedef framing::cluster::ErrorType ErrorType;
+ typedef framing::SequenceNumber SequenceNumber;
ErrorCheck(Cluster&);
/** A local error has occured */
- void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+ void error(Connection&, ErrorType, SequenceNumber frameSeq, const MemberSet&,
const std::string& msg);
/** Called when a frame is delivered */
@@ -66,7 +68,8 @@ class ErrorCheck
bool isUnresolved() const { return type != NONE; }
-
+ /** Respond to an error check saying we had no error. */
+ void respondNone(const MemberId&, uint8_t type, SequenceNumber frameSeq);
private:
static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE;
@@ -78,7 +81,7 @@ class ErrorCheck
Multicaster& mcast;
FrameQueue frames;
MemberSet unresolved;
- uint64_t frameSeq;
+ SequenceNumber frameSeq;
ErrorType type;
Connection* connection;
};
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index 717e1a47cd..190eeb7293 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -19,21 +19,21 @@
*
*/
+#include "qpid/broker/Message.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/Multicaster.h"
#include "qpid/framing/ClusterMessageExpiredBody.h"
#include "qpid/sys/Time.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
: expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-struct ExpiryTask : public broker::TimerTask {
+struct ExpiryTask : public sys::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
: TimerTask(when), expiryPolicy(policy), expiryId(id) {}
void fire() { expiryPolicy->sendExpire(expiryId); }
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index 4ec41c93bc..bdbe3a61dc 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -33,10 +33,13 @@
namespace qpid {
namespace broker {
-class Timer;
class Message;
}
+namespace sys {
+class Timer;
+}
+
namespace cluster {
class Multicaster;
@@ -46,7 +49,7 @@ class Multicaster;
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
+ ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
@@ -78,7 +81,7 @@ class ExpiryPolicy : public broker::ExpiryPolicy
boost::intrusive_ptr<Expired> expiredPolicy;
Multicaster& mcast;
MemberId memberId;
- broker::Timer& timer;
+ sys::Timer& timer;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index cae506fc06..29e368b671 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -46,8 +46,8 @@ class AMQFrame : public AMQDataBlock
QPID_COMMON_EXTERN AMQBody* getBody();
QPID_COMMON_EXTERN const AMQBody* getBody() const;
- AMQMethodBody* getMethod() { return getBody()->getMethod(); }
- const AMQMethodBody* getMethod() const { return getBody()->getMethod(); }
+ AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; }
+ const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; }
void setMethod(ClassId c, MethodId m);
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 3ea38ca45e..2df10b1e95 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -62,7 +62,6 @@ ManagementAgent::ManagementAgent () :
ManagementAgent::~ManagementAgent ()
{
- timer.stop();
{
Mutex::ScopedLock lock (userLock);
@@ -90,9 +89,10 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
dataDir = _dataDir;
interval = _interval;
broker = _broker;
+ timer = &_broker->getTimer();
threadPoolSize = _threads;
ManagementObject::maxThreads = threadPoolSize;
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
+ timer->add (new Periodic(*this, interval));
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -219,7 +219,7 @@ ManagementAgent::Periodic::~Periodic () {}
void ManagementAgent::Periodic::fire ()
{
- agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
+ agent.timer->add (new Periodic (agent, agent.interval));
agent.periodicProcessing ();
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index c9c9b3f66d..84e84e3daa 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -24,9 +24,9 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
@@ -98,7 +98,7 @@ public:
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
private:
- struct Periodic : public qpid::broker::TimerTask
+ struct Periodic : public qpid::sys::TimerTask
{
ManagementAgent& agent;
@@ -183,12 +183,12 @@ private:
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
- qpid::broker::Timer timer;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
+ qpid::sys::Timer* timer;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index 1a3ce1c069..b7d52372f4 100644
--- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -72,6 +72,7 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu
FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+ headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
route(msg);
}
diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
index c0cc36efe3..b5911bb71e 100644
--- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -83,15 +83,29 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable
std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
Queue::shared_ptr queue = queues.find(queueName);
if (queue) {
- FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- msg.deliverTo(queue);
- QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes();
- mgmtExchange->inc_byteRoutes( msg.contentSize());
+
+ SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION));
+
+ // note that queue will ++ before enqueue.
+ if (queue->getPosition() > --seqno1) // test queue.pos < seqnumber
+ {
+ QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue");
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
+ } else {
+ queue->setPosition(seqno1);
+
+ FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.erase(REPLICATION_TARGET_QUEUE);
+ headers.erase(REPLICATION_EVENT_SEQNO);
+ headers.erase(REPLICATION_EVENT_TYPE);
+ headers.erase(QUEUE_MESSAGE_POSITION);
+ msg.deliverTo(queue);
+ QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgRoutes();
+ mgmtExchange->inc_byteRoutes( msg.contentSize());
+ }
}
} else {
QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");
diff --git a/qpid/cpp/src/qpid/replication/constants.h b/qpid/cpp/src/qpid/replication/constants.h
index fb7085c570..c5ba7d3d6a 100644
--- a/qpid/cpp/src/qpid/replication/constants.h
+++ b/qpid/cpp/src/qpid/replication/constants.h
@@ -26,6 +26,7 @@ const std::string REPLICATION_EVENT_TYPE("qpid.replication.type");
const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
+const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position");
const int ENQUEUE(1);
const int DEQUEUE(2);
diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp
index d77f4a0b1e..5ed117db93 100644
--- a/qpid/cpp/src/qpid/sys/Timer.cpp
+++ b/qpid/cpp/src/qpid/sys/Timer.cpp
@@ -20,7 +20,8 @@
*/
#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
-#include <iostream>
+#include "qpid/log/Statement.h"
+
#include <numeric>
using boost::intrusive_ptr;
@@ -58,6 +59,8 @@ void TimerTask::setupNextFire() {
if (period && readyToFire()) {
nextFireTime = AbsTime(nextFireTime, period);
cancelled = false;
+ } else {
+ QPID_LOG(error, "Couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
}
}
@@ -91,24 +94,39 @@ void Timer::run()
} else {
intrusive_ptr<TimerTask> t = tasks.top();
tasks.pop();
+ assert(!(t->nextFireTime < t->sortTime));
+
+ // warn on extreme lateness
+ AbsTime start(AbsTime::now());
+ Duration late(t->sortTime, start);
+ if (late > 500 * TIME_MSEC) {
+ QPID_LOG(warning, "Timer delayed by " << late / TIME_MSEC << "ms");
+ }
{
ScopedLock<Mutex> l(t->callbackLock);
if (t->cancelled) {
continue;
- } else if(t->readyToFire()) {
+ } else if(Duration(t->nextFireTime, start) >= 0) {
Monitor::ScopedUnlock u(monitor);
t->fireTask();
+ // Warn on callback overrun
+ AbsTime end(AbsTime::now());
+ Duration overrun(tasks.top()->nextFireTime, end);
+ if (overrun > 1 * TIME_MSEC) {
+ QPID_LOG(warning,
+ "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
+ << Duration(start, end) << "]");
+ }
continue;
} else {
// If the timer was adjusted into the future it might no longer
// be the next event, so push and then get top to make sure
// You can only push events into the future
- assert(!(t->nextFireTime < t->sortTime));
t->sortTime = t->nextFireTime;
tasks.push(t);
}
}
- monitor.wait(tasks.top()->nextFireTime);
+ monitor.wait(tasks.top()->sortTime);
}
}
}
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 16cdfcbd72..563068a018 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -65,6 +65,7 @@ unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
$(lib_client) $(lib_broker) $(lib_console)
unit_test_SOURCES= unit_test.cpp unit_test.h \
+ ClientSessionTest.cpp \
BrokerFixture.h SocketProxy.h \
exception_test.cpp \
RefCounted.cpp \
@@ -75,7 +76,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
QueueOptionsTest.cpp \
InlineAllocator.cpp \
InlineVector.cpp \
- ClientSessionTest.cpp \
SequenceSet.cpp \
StringUtils.cpp \
IncompleteMessageList.cpp \
diff --git a/qpid/cpp/src/tests/PartialFailure.cpp b/qpid/cpp/src/tests/PartialFailure.cpp
index 7169e53a16..f77a1401f8 100644
--- a/qpid/cpp/src/tests/PartialFailure.cpp
+++ b/qpid/cpp/src/tests/PartialFailure.cpp
@@ -82,10 +82,31 @@ void queueAndSub(Client& c) {
c.subs.subscribe(c.lq, c.name);
}
+// Handle near-simultaneous errors
+QPID_AUTO_TEST_CASE(testCoincidentErrors) {
+ ClusterFixture cluster(2, updateArgs, -1);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ c0.session.queueDeclare("q", durable=true);
+ {
+ ScopedSuppressLogging allQuiet;
+ async(c0.session).messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception]", "q"));
+ async(c1.session).messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception]", "q"));
+
+ int alive=0;
+ try { Client c00(cluster[0], "c00"); ++alive; } catch (...) {}
+ try { Client c11(cluster[1], "c11"); ++alive; } catch (...) {}
+
+ BOOST_CHECK_EQUAL(alive, 1);
+ }
+}
+
+#if 0 // FIXME aconway 2009-07-30:
// Verify normal cluster-wide errors.
QPID_AUTO_TEST_CASE(testNormalErrors) {
// FIXME aconway 2009-04-10: Would like to put a scope just around
- // the statements expected to fail (in BOOST_CHECK_THROW) but that
+ // the statements expected to fail (in BOOST_CHECK_yTHROW) but that
// sproadically lets out messages, possibly because they're in
// Connection thread.
@@ -96,7 +117,7 @@ QPID_AUTO_TEST_CASE(testNormalErrors) {
{
ScopedSuppressLogging allQuiet;
- queueAndSub(c0);
+ queueAndsub(c0);
c0.session.messageTransfer(content=Message("x", "c0"));
BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
@@ -234,5 +255,5 @@ QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
}
}
#endif
-
+#endif // FIXME aconway 2009-07-30:
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index be4a87d2ab..b70afa52a7 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -19,6 +19,7 @@
*
*/
#include "unit_test.h"
+#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -275,11 +276,13 @@ class TestMessageStoreOC : public NullMessageStore
uint enqCnt;
uint deqCnt;
+ bool error;
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
{
+ if (error) throw Exception("Dequeue error test");
deqCnt++;
}
@@ -287,10 +290,16 @@ class TestMessageStoreOC : public NullMessageStore
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /* queue */)
{
+ if (error) throw Exception("Enqueue error test");
enqCnt++;
}
- TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0) {}
+ void createError()
+ {
+ error=true;
+ }
+
+ TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
~TestMessageStoreOC(){}
};
@@ -689,6 +698,30 @@ not requeued to the store.
}
-QPID_AUTO_TEST_SUITE_END()
+QPID_AUTO_TEST_CASE(testLastNodeJournalError){
+/*
+simulate store excption going into last node standing
+
+*/
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ // set queue mode
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+
+ // check requeue 1
+ intrusive_ptr<Message> msg1 = create_message("e", "C");
+
+ queue1->deliver(msg1);
+ testStore.createError();
+
+ ScopedSuppressLogging sl; // Suppress messages for expected errors.
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+
+}QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/allSegmentTypes.h b/qpid/cpp/src/tests/allSegmentTypes.h
deleted file mode 100644
index e942250c89..0000000000
--- a/qpid/cpp/src/tests/allSegmentTypes.h
+++ /dev/null
@@ -1,128 +0,0 @@
-#ifndef TESTS_ALLSEGMENTTYPES_H
-#define TESTS_ALLSEGMENTTYPES_H
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-///
-/// This file was automatically generated from the AMQP specification.
-/// Do not edit.
-///
-
-
-#include "qpid/amqp_0_10/specification.h"
-#include "qpid/amqp_0_10/Header.h"
-#include "qpid/amqp_0_10/Body.h"
-
-using namespace qpid::amqp_0_10;
-
-template <class Op> size_t allSegmentTypes(Op& op) {
- op(Header());
- op(Body());
- op(ControlHolder(connection::Start()));
- op(ControlHolder(connection::StartOk()));
- op(ControlHolder(connection::Secure()));
- op(ControlHolder(connection::SecureOk()));
- op(ControlHolder(connection::Tune()));
- op(ControlHolder(connection::TuneOk()));
- op(ControlHolder(connection::Open()));
- op(ControlHolder(connection::OpenOk()));
- // op(ControlHolder(connection::Redirect())); // known-hosts array
- op(ControlHolder(connection::Heartbeat()));
- // op(ControlHolder(connection::Close())); // class/method dropped
- op(ControlHolder(connection::CloseOk()));
- op(ControlHolder(session::Attach()));
- op(ControlHolder(session::Attached()));
- op(ControlHolder(session::Detach()));
- op(ControlHolder(session::Detached()));
- op(ControlHolder(session::RequestTimeout()));
- op(ControlHolder(session::Timeout()));
- op(ControlHolder(session::CommandPoint()));
- // op(ControlHolder(session::Expected())); // fragments array encoding problem
- // op(ControlHolder(session::Confirmed())); // fragments array encoding problem
- op(ControlHolder(session::Completed()));
- op(ControlHolder(session::KnownCompleted()));
- op(ControlHolder(session::Flush()));
- op(ControlHolder(session::Gap()));
- // FIXME aconway 2008-04-15: command encoding, fix headers, fix sized structs.
- op(CommandHolder(execution::Sync()));
- op(CommandHolder(execution::Result()));
-
- // FIXME aconway 2008-04-16: investigate remaining failures.
- // op(CommandHolder(execution::Exception()));
- op(CommandHolder(message::Transfer()));
- op(CommandHolder(message::Accept()));
- // op(CommandHolder(message::Reject()));
- op(CommandHolder(message::Release()));
- op(CommandHolder(message::Acquire()));
- // op(CommandHolder(message::Resume()));
- op(CommandHolder(message::Subscribe()));
- op(CommandHolder(message::Cancel()));
- op(CommandHolder(message::SetFlowMode()));
- op(CommandHolder(message::Flow()));
- op(CommandHolder(message::Flush()));
- op(CommandHolder(message::Stop()));
- op(CommandHolder(tx::Select()));
- op(CommandHolder(tx::Commit()));
- op(CommandHolder(tx::Rollback()));
- op(CommandHolder(dtx::Select()));
- // op(CommandHolder(dtx::Start()));
- // op(CommandHolder(dtx::End()));
- // op(CommandHolder(dtx::Commit()));
- // op(CommandHolder(dtx::Forget()));
- // op(CommandHolder(dtx::GetTimeout()));
- // op(CommandHolder(dtx::Prepare()));
- // op(CommandHolder(dtx::Recover()));
- // op(CommandHolder(dtx::Rollback()));
- // op(CommandHolder(dtx::SetTimeout()));
- op(CommandHolder(exchange::Declare()));
- op(CommandHolder(exchange::Delete()));
- op(CommandHolder(exchange::Query()));
- op(CommandHolder(exchange::Bind()));
- op(CommandHolder(exchange::Unbind()));
- op(CommandHolder(exchange::Bound()));
- op(CommandHolder(queue::Declare()));
- op(CommandHolder(queue::Delete()));
- op(CommandHolder(queue::Purge()));
- op(CommandHolder(queue::Query()));
- // op(CommandHolder(file::Qos()));
- // op(CommandHolder(file::QosOk()));
-// op(CommandHolder(file::Consume()));
-// op(CommandHolder(file::ConsumeOk()));
-// op(CommandHolder(file::Cancel()));
-// op(CommandHolder(file::Open()));
-// op(CommandHolder(file::OpenOk()));
-// op(CommandHolder(file::Stage()));
-// op(CommandHolder(file::Publish()));
-// op(CommandHolder(file::Return()));
-// op(CommandHolder(file::Deliver()));
-// op(CommandHolder(file::Ack()));
-// op(CommandHolder(file::Reject()));
-// op(CommandHolder(stream::Qos()));
-// op(CommandHolder(stream::QosOk()));
-// op(CommandHolder(stream::Consume()));
-// op(CommandHolder(stream::ConsumeOk()));
-// op(CommandHolder(stream::Cancel()));
-// op(CommandHolder(stream::Publish()));
-// op(CommandHolder(stream::Return()));
-// op(CommandHolder(stream::Deliver()));
- return 0;
-}
-#endif /*!TESTS_ALLSEGMENTTYPES_H*/
diff --git a/qpid/cpp/src/tests/replication_test b/qpid/cpp/src/tests/replication_test
index 6e0c1c8d3b..8b3022b260 100755
--- a/qpid/cpp/src/tests/replication_test
+++ b/qpid/cpp/src/tests/replication_test
@@ -56,10 +56,12 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 1
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
#queue-d deliberately not declared on DR; this error case should be handled
#publish and consume from test queues on broker A:
@@ -89,6 +91,12 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f
./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null
./receiver --port $BROKER_A --queue queue-d > /dev/null
+
+ # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side
+ # making sure all the messages have been flushed from the replication queue.
+ echo dummy | ./sender --port $BROKER_A --routing-key queue-e --send-eos 1
+ ./receiver --port $BROKER_B --queue queue-e --messages 1 > /dev/null
+
#shutdown broker A then check that broker Bs versions of the queues are as expected
../qpidd -q --port $BROKER_A
unset BROKER_A
@@ -98,7 +106,6 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f
./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
- stop_brokers
tail -5 queue-a-input.repl > queue-a-expected.repl
tail -10 queue-b-input.repl > queue-b-expected.repl
@@ -108,6 +115,60 @@ if test -d ${PYTHON_DIR} && test -f ../.libs/replicating_listener.so && test -f
grep 'queue-d does not exist' replication-dest.log > /dev/null || echo "WARNING: Expected error to be logged!"
+ stop_brokers
+
+ # now check offsets working (enqueue based on position being set, not queue abs position)
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
+ BROKER_A=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 1
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-d
+
+ i=1
+ while [ $i -le 10 ]; do
+ echo Message $i for A >> queue-e-input.repl
+ i=`expr $i + 1`
+ done
+
+ ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e-input.repl
+ ./receiver --port $BROKER_A --queue queue-e --messages 10 > /dev/null
+
+ # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side
+ # making sure all the messages have been flushed from the replication queue.
+ echo dummy | ./sender --port $BROKER_A --routing-key queue-d --send-eos 1
+ ./receiver --port $BROKER_B --queue queue-d --messages 1 > /dev/null
+
+ # now check offsets working
+ ../qpidd -q --port $BROKER_B
+ unset BROKER_B
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+ # now send another 15
+ i=11
+ while [ $i -le 15 ]; do
+ echo Message $i for A >> queue-e1-input.repl
+ i=`expr $i + 1`
+ done
+ ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e1-input.repl
+
+ ./receiver --port $BROKER_B --queue queue-e > queue-e-backup.repl
+ diff queue-e-backup.repl queue-e1-input.repl || FAIL=1
+
+ stop_brokers
+
if [ x$FAIL != x ]; then
echo replication test failed: expectations not met!
exit 1
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 92917dcfa6..1e5b091a87 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -68,7 +68,7 @@
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
- <field name="frame-seq" type="uint64"/>
+ <field name="frame-seq" type="sequence-no"/>
</control>
@@ -170,7 +170,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
- <field name="frame-seq" type="uint64"/> <!-- frame sequence number -->
+ <field name="frame-seq" type="sequence-no"/> <!-- frame sequence number -->
</control>
<!-- Updater cannot fulfill an update offer. -->
diff --git a/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm b/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
index 18d90fab29..bcf7db345b 100644
--- a/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
+++ b/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
@@ -39,12 +39,14 @@ public class ProtocolVersion implements Comparable
{
private final byte _majorVersion;
private final byte _minorVersion;
+ private final String _stringFormat;
public ProtocolVersion(byte majorVersion, byte minorVersion)
{
_majorVersion = majorVersion;
_minorVersion = minorVersion;
+ _stringFormat = _majorVersion+"-"+_minorVersion;
}
public byte getMajorVersion()
@@ -57,6 +59,10 @@ public class ProtocolVersion implements Comparable
return _minorVersion;
}
+ public String toString()
+ {
+ return _stringFormat;
+ }
public int compareTo(Object o)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index c6e370206e..e963fb23ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -58,6 +58,12 @@ import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.logging.actors.AMQPChannelActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
public class AMQChannel
{
@@ -112,13 +118,22 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private boolean _closing;
+
+ private LogActor _actor;
+ private LogSubject _logSubject;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
_channelId = channelId;
+
+ _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
+ _logSubject = new ChannelLogSubject(this);
+
+ _actor.message(ChannelMessages.CHN_1001());
+
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -386,6 +401,8 @@ public class AMQChannel
private void setClosing(boolean closing)
{
_closing = closing;
+
+ CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003());
}
private void unsubscribeAllConsumers() throws AMQException
@@ -789,6 +806,8 @@ public class AMQChannel
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
+ _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+
if (wasSuspended)
{
// may need to deliver queued messages
@@ -891,6 +910,8 @@ public class AMQChannel
public void setCredit(final long prefetchSize, final int prefetchCount)
{
+ //fixme
+// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount);
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -942,4 +963,9 @@ public class AMQChannel
{
return _recordDeliveryMethod;
}
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e56f1cda12..90b4590d4c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -256,7 +256,7 @@ public class ServerConfiguration implements SignalHandler
// Our configuration class needs to make the interpolate method
// public so it can be called below from the config method.
- private static class MyConfiguration extends CompositeConfiguration
+ public static class MyConfiguration extends CompositeConfiguration
{
public String interpolate(String obj)
{
@@ -264,7 +264,7 @@ public class ServerConfiguration implements SignalHandler
}
}
- private final static Configuration flatConfig(File file) throws ConfigurationException
+ public final static Configuration flatConfig(File file) throws ConfigurationException
{
// We have to override the interpolate methods so that
// interpolation takes place accross the entirety of the
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
index 203a5d160d..d5683b3c7b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
@@ -40,4 +40,21 @@ public interface LogActor
* @param message The message to log
*/
public void message(LogSubject subject, LogMessage message);
-} \ No newline at end of file
+
+ /**
+ * Logs the specified LogMessage against this actor
+ *
+ * Currently logging has a global setting however this will later be revised and
+ * as such the LogActor will need to take into consideration any new configuration
+ * as a means of enabling the logging of LogActors and LogSubjects.
+ *
+ * @param message The message to log
+ */
+ public void message(LogMessage message);
+
+ /**
+ *
+ * @return the RootMessageLogger that is currently in use by this LogActor.
+ */
+ RootMessageLogger getRootMessageLogger();
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
index cd7992faa7..5ac5eab6c4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
@@ -37,6 +37,14 @@ public interface RootMessageLogger
*/
boolean isMessageEnabled(LogActor actor, LogSubject subject);
+ /**
+ * Determine if the LogActor should be generating log messages.
+ *
+ * @param actor The actor requesting the logging
+ *
+ * @return boolean true if the message should be logged.
+ */
+ boolean isMessageEnabled(LogActor actor);
/**
* Log the raw message to the configured logger.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
index 1c2b4e4046..a3bf276d1e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
@@ -40,6 +40,11 @@ public class RootMessageLoggerImpl implements RootMessageLogger
return _enabled;
}
+ public boolean isMessageEnabled(LogActor actor)
+ {
+ return _enabled;
+ }
+
public void rawMessage(String message)
{
_rawLogger.rawMessage(MESSAGE + message);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
index 95f2dc9ff6..4502710dd6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -32,6 +32,10 @@ public abstract class AbstractActor implements LogActor
public AbstractActor(RootMessageLogger rootLogger)
{
+ if(rootLogger == null)
+ {
+ throw new NullPointerException("RootMessageLogger cannot be null");
+ }
_rootLogger = rootLogger;
}
@@ -42,4 +46,18 @@ public abstract class AbstractActor implements LogActor
_rootLogger.rawMessage(_logString + String.valueOf(subject) + message);
}
}
+
+ public void message(LogMessage message)
+ {
+ if (_rootLogger.isMessageEnabled(this))
+ {
+ _rootLogger.rawMessage(_logString + message);
+ }
+ }
+
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootLogger;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 8e9ee3720c..24df17683f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -235,12 +235,12 @@ MST-1006 = Recovery Complete[ : {0}]
#Connection
# 0 - Client id
# 1 - Protocol Version
-CON-1001 = Open : Client ID {0}[ : Protocol Version : {1}]
+CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}]
CON-1002 = Close
#Channel
-# 0 - count
-CHN-1001 = Create : Prefetch {0, number}
+CHN-1001 = Create
+# : Prefetch Size {0,number} : Count {1,number}
# 0 - flow
CHN-1002 = Flow {0}
CHN-1003 = Close
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
index 3774155626..f996576f31 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
@@ -41,6 +41,7 @@ public class Log4jMessageLogger implements RawMessageLogger
_level = Level.toLevel(level);
_rawMessageLogger = Logger.getLogger(logger);
+ _rawMessageLogger.setLevel(_level);
}
public void rawMessage(String message)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
new file mode 100644
index 0000000000..28d64de74e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
+
+public class MessagesStoreLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the MessagesStoreLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Message Store Type
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ms({1})";
+
+ /** Create an ExchangeLogSubject that Logs in the following format. */
+ public MessagesStoreLogSubject(VirtualHost vhost, MessageStore store)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, vhost.getName(),
+ store.getClass().getSimpleName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 5f1615351b..2a6cab6048 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -54,6 +54,10 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -139,6 +143,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
+ private LogSubject _logSubject;
public ManagedObject getManagedObject()
{
@@ -156,6 +161,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+ _actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -171,18 +178,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
- AMQStateManager stateManager) throws AMQException
- {
- _stateManager = stateManager;
- _minaProtocolSession = session;
- session.setAttachment(this);
-
- _codecFactory = codecFactory;
-
- _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
- }
-
private AMQProtocolSessionMBean createMBean() throws AMQException
{
try
@@ -211,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _sessionID;
}
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -236,42 +236,54 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
- if (_logger.isDebugEnabled())
+ //Look up the Channel's Actor and set that as the current actor
+ // If that is not available then we can use the ConnectionActor
+ // that is associated with this AMQMPSession.
+ LogActor channelActor = null;
+ if (_channelMap.get(channelId) != null)
{
- _logger.debug("Frame Received: " + frame);
+ channelActor = _channelMap.get(channelId).getLogActor();
}
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
+ try
{
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ if (_logger.isDebugEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
+ _logger.debug("Frame Received: " + frame);
}
- else
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
{
- if (_logger.isInfoEnabled())
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
{
- _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
}
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
+ }
- closeProtocolSession();
- return;
+ closeProtocolSession();
+ return;
+ }
}
- }
- CurrentActor.set(_actor);
- try
- {
- body.handle(channelId, this);
- }
- catch (AMQException e)
- {
- closeChannel(channelId);
- throw e;
+ try
+ {
+ body.handle(channelId, this);
+ }
+ catch (AMQException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
finally
{
@@ -285,6 +297,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
+ // Log incomming protocol negotiation request
+ _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true));
+
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
// This sets the protocol version (and hence framing classes) for this session.
@@ -643,6 +658,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
if (!_closed)
{
_closed = true;
+
+ _actor.message(ConnectionMessages.CON_1002());
if (_virtualHost != null)
{
@@ -770,7 +787,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
{
- setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+ String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE);
+ setContextKey(new AMQShortString(clientID));
+
+ // Log the Opening of the connection for this client
+ _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true));
}
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
@@ -829,6 +850,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_virtualHost = virtualHost;
_actor.virtualHostSelected(this);
+ _logSubject = new ConnectionLogSubject(this);
_virtualHost.getConnectionRegistry().registerConnection(this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index b15e6a4219..21fd1ce6b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -29,6 +29,8 @@ import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -39,6 +41,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
{
long getSessionID();
+ LogActor getLogActor();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 3e1477fc1c..e86c85e0e4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -65,6 +65,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
@@ -185,6 +187,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
*/
public void commitTransactions(int channelId) throws JMException
{
+ CurrentActor.set(getLogActor());
try
{
AMQChannel channel = _session.getChannel(channelId);
@@ -199,6 +202,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
throw new MBeanException(ex, ex.toString());
}
+ finally
+ {
+ CurrentActor.remove();
+ }
}
/**
@@ -209,6 +216,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
*/
public void rollbackTransactions(int channelId) throws JMException
{
+ CurrentActor.set(getLogActor());
try
{
AMQChannel channel = _session.getChannel(channelId);
@@ -223,6 +231,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
throw new MBeanException(ex, ex.toString());
}
+ finally
+ {
+ CurrentActor.remove();
+ }
}
/**
@@ -269,18 +281,38 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
0,
0);
- _session.writeFrame(responseBody.generateFrame(0));
-
+ CurrentActor.set(getLogActor());
try
{
- _session.closeSession();
+ _session.writeFrame(responseBody.generateFrame(0));
+
+ try
+ {
+
+ _session.closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
}
- catch (AMQException ex)
+ finally
{
- throw new MBeanException(ex, ex.toString());
+ CurrentActor.remove();
}
}
+ /**
+ * Return the LogActor for this MBean Session
+ * //fixme currently simply returning the managed sessions LogActor, should
+ * be the ManagementActor
+ * @return
+ */
+ private LogActor getLogActor()
+ {
+ return _session.getLogActor();
+ }
+
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 649e84cb50..ad1e8a580e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -88,7 +88,7 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
-
+
public void setAccessableName(String name)
{
_logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -386,11 +386,6 @@ public class VirtualHost implements Accessable
return _exchangeFactory;
}
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
public MessageStore getMessageStore()
{
return _messageStore;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index ef345f45c4..28f5d417ff 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
@@ -475,14 +476,14 @@ public class AbstractHeadersExchangeTestBase extends TestCase
new LinkedList<RequiredDeliveryException>()
);
- Message(String id, String... headers) throws AMQException
+ Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
{
- this(id, getHeaders(headers));
+ this(protocolSession, id, getHeaders(headers));
}
- Message(String id, FieldTable headers) throws AMQException
+ Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
{
- this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
}
public IncomingMessage getIncomingMessage()
@@ -490,7 +491,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return _incoming;
}
- private Message(long messageId,
+ private Message(AMQProtocolSession protocolsession, long messageId,
MessagePublishInfo publish,
ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
@@ -499,7 +500,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
- _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+ _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
_incoming.setContentHeaderBody(header);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index a26f7f16d7..27ef04431b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -57,7 +57,7 @@ public class DestWildExchangeTest extends TestCase
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_store = new MemoryMessageStore();
_context = new StoreContext();
- _protocolSession = new InternalTestProtocolSession();
+ _protocolSession = new InternalTestProtocolSession(_vhost);
}
public void tearDown()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index fd11ddeae2..3c6abd0ad9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -23,14 +23,21 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
+ AMQProtocolSession _protocolSession;
+
protected void setUp() throws Exception
{
super.setUp();
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
+ // Just use the first vhost.
+ VirtualHost virtualHost = ApplicationRegistry.getInstance(1).getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ _protocolSession = new InternalTestProtocolSession(virtualHost);
}
protected void tearDown()
@@ -49,21 +56,21 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
TestQueue q7 = bindDefault("F0000", "F0001=Bear");
TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
- routeAndTest(new Message("Message1", "F0000"), q1);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
+ routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message("Message6", "F0002"));
+ routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
- Message m7 = new Message("Message7", "XXXXX");
+ Message m7 = new Message(_protocolSession, "Message7", "XXXXX");
MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
pb7.setMandatory(true);
routeAndTest(m7,true);
- Message m8 = new Message("Message8", "F0000");
+ Message m8 = new Message(_protocolSession, "Message8", "F0000");
MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
pb8.setMandatory(true);
routeAndTest(m8,false,q1);
@@ -79,19 +86,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
- routeAndTest(new Message("Message1", "F0000"), q1, q3);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message6", "F0002"));
+ routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3);
+ routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
}
public void testMandatory() throws AMQException
{
bindDefault("F0000");
- Message m1 = new Message("Message1", "XXXXX");
- Message m2 = new Message("Message2", "F0000");
+ Message m1 = new Message(_protocolSession, "Message1", "XXXXX");
+ Message m2 = new Message(_protocolSession, "Message2", "F0000");
MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
pb1.setMandatory(true);
MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
index 298e3bc22c..009699be35 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
@@ -27,11 +27,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.RootMessageLoggerImpl;
@@ -56,38 +54,33 @@ public class AMQPChannelActorTest extends TestCase
LogActor _amqpActor;
UnitTestMessageLogger _rawLogger;
+ AMQProtocolSession _session;
+ AMQChannel _channel;
public void setUp() throws ConfigurationException, AMQException
{
Configuration config = new PropertiesConfiguration();
ServerConfiguration serverConfig = new ServerConfiguration(config);
+ setUpWithConfig(serverConfig);
+ }
+
+ private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException
+ {
_rawLogger = new UnitTestMessageLogger();
RootMessageLogger rootLogger =
new RootMessageLoggerImpl(serverConfig, _rawLogger);
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().iterator().next();
+
// Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
+ _session = new InternalTestProtocolSession(virtualHost);
- AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+ _channel = new AMQChannel(_session, 1, _session.getVirtualHost().getMessageStore());
- _amqpActor = new AMQPChannelActor(channel, rootLogger);
+ _amqpActor = new AMQPChannelActor(_channel, rootLogger);
}
@@ -105,22 +98,7 @@ public class AMQPChannelActorTest extends TestCase
*/
public void testChannel()
{
- final String message = "test logging";
-
- _amqpActor.message(new LogSubject()
- {
- public String toString()
- {
- return "[AMQPActorTest]";
- }
-
- }, new LogMessage()
- {
- public String toString()
- {
- return message;
- }
- });
+ final String message = sendTestMessage();
List<Object> logs = _rawLogger.getLogMessages();
@@ -146,40 +124,12 @@ public class AMQPChannelActorTest extends TestCase
}
- public void testChannelLoggingOff() throws ConfigurationException, AMQException
+ /**
+ * Log a message using the test Actor
+ * @return the logged message
+ */
+ private String sendTestMessage()
{
- Configuration config = new PropertiesConfiguration();
- config.addProperty("status-updates", "OFF");
-
- ServerConfiguration serverConfig = new ServerConfiguration(config);
-
- _rawLogger = new UnitTestMessageLogger();
- RootMessageLogger rootLogger =
- new RootMessageLoggerImpl(serverConfig, _rawLogger);
-
- // Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
-
-
- AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
-
- _amqpActor = new AMQPChannelActor(channel, rootLogger);
-
final String message = "test logging";
_amqpActor.message(new LogSubject()
@@ -196,6 +146,152 @@ public class AMQPChannelActorTest extends TestCase
return message;
}
});
+ return message;
+ }
+
+ /**
+ * Test that if logging is configured to be off in the configuration that
+ * no logging is presented
+ * @throws ConfigurationException
+ * @throws AMQException
+ */
+ public void testChannelLoggingOFF() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "OFF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ setUpWithConfig(serverConfig);
+
+ sendTestMessage();
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+ /**
+ * Test that if logging is configured to be off in the configuration that
+ * no logging is presented
+ * @throws ConfigurationException
+ * @throws AMQException
+ */
+ public void testChannelLoggingOfF() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "OfF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ setUpWithConfig(serverConfig);
+
+ sendTestMessage();
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+ /**
+ * Test that if logging is configured to be off in the configuration that
+ * no logging is presented
+ * @throws ConfigurationException
+ * @throws AMQException
+ */
+ public void testChannelLoggingOff() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "Off");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ setUpWithConfig(serverConfig);
+
+ sendTestMessage();
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+ /**
+ * Test that if logging is configured to be off in the configuration that
+ * no logging is presented
+ * @throws ConfigurationException
+ * @throws AMQException
+ */
+ public void testChannelLoggingofF() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "ofF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ setUpWithConfig(serverConfig);
+
+ sendTestMessage();
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+ /**
+ * Test that if logging is configured to be off in the configuration that
+ * no logging is presented
+ * @throws ConfigurationException
+ * @throws AMQException
+ */
+ public void testChannelLoggingoff() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "off");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ setUpWithConfig(serverConfig);
+
+ sendTestMessage();
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+ /**
+ * Test that if logging is configured to be off in the configuration that
+ * no logging is presented
+ * @throws ConfigurationException
+ * @throws AMQException
+ */
+ public void testChannelLoggingoFf() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "oFf");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ setUpWithConfig(serverConfig);
+
+ sendTestMessage();
List<Object> logs = _rawLogger.getLogMessages();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
index c220865864..54eddf1050 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
@@ -26,18 +26,16 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.RootMessageLoggerImpl;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogActor;
import java.util.List;
@@ -56,41 +54,34 @@ public class AMQPConnectionActorTest extends TestCase
LogActor _amqpActor;
UnitTestMessageLogger _rawLogger;
- public void setUp() throws ConfigurationException
+ public void setUp() throws ConfigurationException, AMQException
{
Configuration config = new PropertiesConfiguration();
ServerConfiguration serverConfig = new ServerConfiguration(config);
+ setUpWithConfig(serverConfig);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException
+ {
_rawLogger = new UnitTestMessageLogger();
RootMessageLogger rootLogger =
new RootMessageLoggerImpl(serverConfig, _rawLogger);
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().iterator().next();
+
// Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
+ AMQProtocolSession session = new InternalTestProtocolSession(virtualHost);
_amqpActor = new AMQPConnectionActor(session, rootLogger);
}
- public void tearDown()
- {
- _rawLogger.clearLogMessages();
- }
-
/**
* Test the AMQPActor logging as a Connection level.
*
@@ -98,26 +89,10 @@ public class AMQPConnectionActorTest extends TestCase
*
* The log message should be fully repalaced (no '{n}' values) and should
* not contain any channel identification.
- *
*/
public void testConnection()
{
- final String message = "test logging";
-
- _amqpActor.message(new LogSubject()
- {
- public String toString()
- {
- return "[AMQPActorTest]";
- }
-
- }, new LogMessage()
- {
- public String toString()
- {
- return message;
- }
- });
+ final String message = sendLogMessage();
List<Object> logs = _rawLogger.getLogMessages();
@@ -129,7 +104,7 @@ public class AMQPConnectionActorTest extends TestCase
// Verify that the message has the correct type
assertTrue("Message contains the [con: prefix",
- logs.get(0).toString().contains("[con:"));
+ logs.get(0).toString().contains("[con:"));
// Verify that all the values were presented to the MessageFormatter
// so we will not end up with '{n}' entries in the log.
@@ -138,11 +113,9 @@ public class AMQPConnectionActorTest extends TestCase
// Verify that the logged message does not contains the 'ch:' marker
assertFalse("Message was logged with a channel identifier." + logs.get(0),
- logs.get(0).toString().contains("/ch:"));
+ logs.get(0).toString().contains("/ch:"));
}
-
-
public void testConnectionLoggingOff() throws ConfigurationException, AMQException
{
Configuration config = new PropertiesConfiguration();
@@ -150,31 +123,18 @@ public class AMQPConnectionActorTest extends TestCase
ServerConfiguration serverConfig = new ServerConfiguration(config);
- _rawLogger = new UnitTestMessageLogger();
- RootMessageLogger rootLogger =
- new RootMessageLoggerImpl(serverConfig, _rawLogger);
+ setUpWithConfig(serverConfig);
- // Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
+ sendLogMessage();
+ List<Object> logs = _rawLogger.getLogMessages();
- _amqpActor = new AMQPConnectionActor(session, rootLogger);
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+ private String sendLogMessage()
+ {
final String message = "test logging";
_amqpActor.message(new LogSubject()
@@ -191,12 +151,7 @@ public class AMQPConnectionActorTest extends TestCase
return message;
}
});
-
- List<Object> logs = _rawLogger.getLogMessages();
-
- assertEquals("Message log size not as expected.", 0, logs.size());
-
+ return message;
}
-
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
index c1cc3253a8..e7dc0ea5da 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
@@ -26,9 +26,8 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
/**
@@ -61,27 +60,16 @@ public class CurrentActorTest extends TestCase
final Exception[] _errors = new Exception[THREADS];
// Create a single session for this test.
- AMQProtocolSession session;
+ AMQProtocolSession _session;
- public void setUp()
+ public void setUp() throws AMQException
{
// Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().iterator().next();
+
+ // Create a single session for this test.
+ _session = new InternalTestProtocolSession(virtualHost);
}
public void testFIFO() throws AMQException
@@ -89,7 +77,7 @@ public class CurrentActorTest extends TestCase
// Create a new actor using retrieving the rootMessageLogger from
// the default ApplicationRegistry.
//fixme reminder that we need a better approach for broker testing.
- AMQPConnectionActor connectionActor = new AMQPConnectionActor(session,
+ AMQPConnectionActor connectionActor = new AMQPConnectionActor(_session,
ApplicationRegistry.getInstance().
getRootMessageLogger());
@@ -120,7 +108,7 @@ public class CurrentActorTest extends TestCase
* to push the actor on to the stack
*/
- AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+ AMQChannel channel = new AMQChannel(_session, 1, _session.getVirtualHost().getMessageStore());
AMQPChannelActor channelActor = new AMQPChannelActor(channel,
ApplicationRegistry.getInstance().
@@ -218,7 +206,7 @@ public class CurrentActorTest extends TestCase
// Create a new actor using retrieving the rootMessageLogger from
// the default ApplicationRegistry.
//fixme reminder that we need a better approach for broker testing.
- AMQPConnectionActor actor = new AMQPConnectionActor(session,
+ AMQPConnectionActor actor = new AMQPConnectionActor(_session,
ApplicationRegistry.getInstance().
getRootMessageLogger());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
index 2a37eae728..b4dd3da2e6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
@@ -27,15 +27,12 @@ public class ChannelMessagesTest extends AbstractTestMessages
{
public void testMessage1001()
{
- Integer prefetch = 12345;
-
- _logMessage = ChannelMessages.CHN_1001(prefetch);
+ _logMessage = ChannelMessages.CHN_1001();
List<Object> log = performLog();
// We use the MessageFormat here as that is what the ChannelMessage
// will do, this makes the resulting value 12,345
- String[] expected = {"Create", "Prefetch",
- MessageFormat.format("{0, number}", prefetch)};
+ String[] expected = {"Create"};
validateLogMessage(log, "CHN-1001", expected);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
index eb76029a5c..d234c88210 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
@@ -24,12 +24,12 @@ import java.util.List;
public class ConnectionMessagesTest extends AbstractTestMessages
{
- public void testMessage1001_WithProtocolVersion()
+ public void testMessage1001_WithClientIDProtocolVersion()
{
String clientID = "client";
String protocolVersion = "8-0";
- _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true);
+ _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true , true);
List<Object> log = performLog();
String[] expected = {"Open :", "Client ID", clientID,
@@ -38,11 +38,11 @@ public class ConnectionMessagesTest extends AbstractTestMessages
validateLogMessage(log, "CON-1001", expected);
}
- public void testMessage1001_NoProtocolVersion()
+ public void testMessage1001_WithClientIDNoProtocolVersion()
{
String clientID = "client";
- _logMessage = ConnectionMessages.CON_1001(clientID, null, false);
+ _logMessage = ConnectionMessages.CON_1001(clientID, null,true, false);
List<Object> log = performLog();
String[] expected = {"Open :", "Client ID", clientID};
@@ -50,6 +50,29 @@ public class ConnectionMessagesTest extends AbstractTestMessages
validateLogMessage(log, "CON-1001", expected);
}
+ public void testMessage1001_WithNOClientIDProtocolVersion()
+ {
+ String protocolVersion = "8-0";
+
+ _logMessage = ConnectionMessages.CON_1001(null, protocolVersion, false , true);
+ List<Object> log = performLog();
+
+ String[] expected = {"Open", ": Protocol Version :", protocolVersion};
+
+ validateLogMessage(log, "CON-1001", expected);
+ }
+
+ public void testMessage1001_WithNoClientIDNoProtocolVersion()
+ {
+ _logMessage = ConnectionMessages.CON_1001(null, null,false, false);
+ List<Object> log = performLog();
+
+ String[] expected = {"Open"};
+
+ validateLogMessage(log, "CON-1001", expected);
+ }
+
+
public void testMessage1002()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
index d7a5aa667b..4b69a46793 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
@@ -77,10 +77,8 @@ public class Log4jMessageLoggerTest extends TestCase
* Verify that the default configuraion of Log4jMessageLogger will
* log a message.
*
- * @throws IOException
- * @throws InterruptedException
*/
- public void testDefaultLogsMessage() throws IOException, InterruptedException
+ public void testDefaultLogsMessage()
{
// Create a logger to test
Log4jMessageLogger logger = new Log4jMessageLogger();
@@ -95,28 +93,17 @@ public class Log4jMessageLoggerTest extends TestCase
}
/**
- * This test checks that if the Root Logger level is set such that the INFO
- * messages would not be logged then the Log4jMessageLogger default of INFO
- * will result in logging not being presented.
+ * This test verifies that the Log4jMessageLogger does not inherit a logging
+ * level from the RootLogger. The Log4jMessageLogger default of INFO
+ * will result in logging being presented.
*
- * @throws IOException
- * @throws InterruptedException
*/
- public void testDefaultsLogsAtInfo() throws IOException, InterruptedException
+ public void testLoggerDoesNotInheritRootLevel()
{
- // Create a logger to test
- Log4jMessageLogger logger = new Log4jMessageLogger();
-
- //Create Message for test
- String message = "testDefaults";
-
//Set default logger level to off
- Logger.getRootLogger().setLevel(Level.WARN);
+ Logger.getRootLogger().setLevel(Level.OFF);
- // Log the message
- logger.rawMessage(message);
-
- verifyNoLog(message);
+ testDefaultLogsMessage();
}
/**
@@ -125,10 +112,8 @@ public class Log4jMessageLoggerTest extends TestCase
* Test this by setting the default logger level to off which has been
* verified to work by test 'testDefaultsLevelObeyed'
*
- * @throws IOException
- * @throws InterruptedException
*/
- public void testDefaultLoggerAdjustment() throws IOException, InterruptedException
+ public void testDefaultLoggerAdjustment()
{
String loggerName = "TestLogger";
// Create a logger to test
@@ -150,41 +135,6 @@ public class Log4jMessageLoggerTest extends TestCase
Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(originalLevel);
}
- /**
- * Test that changing the log level has an effect.
- * Set the level to be debug
- * but only set the logger to log at INFO
- * there should be no data printed
- * subsequently changing the root logger to allow DEBUG should
- * show the message
- *
- * @throws IOException
- * @throws InterruptedException
- */
- public void testDefaultsLevelObeyed() throws IOException, InterruptedException
- {
- // Create a logger to test
- Log4jMessageLogger logger = new Log4jMessageLogger("DEBUG", Log4jMessageLogger.DEFAULT_LOGGER);
-
- //Create Message for test
- String message = "testDefaults";
-
- //Set root logger to INFO only
- Logger.getRootLogger().setLevel(Level.INFO);
-
- // Log the message
- logger.rawMessage(message);
-
- verifyNoLog(message);
-
- //Set root logger to INFO only
- Logger.getRootLogger().setLevel(Level.DEBUG);
-
- // Log the message
- logger.rawMessage(message);
-
- verifyLogPresent(message);
- }
/**
* Check that the Log Message reached log4j
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
index 02cdbdbe6a..56d611a44f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
@@ -36,6 +36,9 @@ import org.apache.qpid.server.logging.actors.TestBlankActor;
import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.List;
@@ -44,6 +47,19 @@ public abstract class AbstractTestLogSubject extends TestCase
protected Configuration _config = new PropertiesConfiguration();
protected LogSubject _subject = null;
+ AMQProtocolSession _session;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().iterator().next();
+
+ // Create a single session for this test.
+ _session = new InternalTestProtocolSession(virtualHost);
+ }
+
protected List<Object> performLog() throws ConfigurationException
{
if (_subject == null)
@@ -96,15 +112,19 @@ public abstract class AbstractTestLogSubject extends TestCase
assertEquals("Username not as expected", userNameParts[0], user);
// Extract IP.
+ // The connection will be of the format - guest@/127.0.0.1:1/test
+ // and so our userNamePart will be '/127.0.0.1:1/test'
String[] ipParts = userNameParts[1].split("/");
+ // We will have three sections
assertEquals("Unable to split IP from rest of Connection:"
- + userNameParts[1], 2, ipParts.length);
+ + userNameParts[1], 3, ipParts.length);
- assertEquals("IP not as expected", ipParts[0], ipString);
+ // We need to skip the first '/' split will be empty so validate 1 as IP
+ assertEquals("IP not as expected", ipString, ipParts[1]);
- //Finally check vhost
- assertEquals("Virtualhost name not as expected.", vhost, ipParts[1]);
+ //Finally check vhost which is section 2
+ assertEquals("Virtualhost name not as expected.", vhost, ipParts[2]);
}
/**
@@ -172,7 +192,7 @@ public abstract class AbstractTestLogSubject extends TestCase
* @param message the message to search
* @param vhost the vhostName to check against
*/
- protected void verifyVirtualHost(String message, VirtualHost vhost)
+ static public void verifyVirtualHost(String message, VirtualHost vhost)
{
String vhostSlice = getSlice("vh", message);
@@ -199,7 +219,7 @@ public abstract class AbstractTestLogSubject extends TestCase
*
* @return the slice if found otherwise null is returned
*/
- protected String getSlice(String sliceID, String message)
+ static public String getSlice(String sliceID, String message)
{
int indexOfSlice = message.indexOf(sliceID + "(");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
index 9d5cb70f4b..75f31d53d1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
public class ChannelLogSubjectTest extends ConnectionLogSubjectTest
{
@@ -37,24 +31,6 @@ public class ChannelLogSubjectTest extends ConnectionLogSubjectTest
{
super.setUp();
- // Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- _session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- _session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
-
AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore());
_subject = new ChannelLogSubject(channel);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
index ff2d9b5e11..0eb9901757 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
@@ -20,39 +20,13 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
public class ConnectionLogSubjectTest extends AbstractTestLogSubject
{
- AMQProtocolSession _session;
public void setUp() throws Exception
{
super.setUp();
- // Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- _session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- _session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
-
_subject = new ConnectionLogSubject(_session);
}
@@ -63,7 +37,7 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject
*/
protected void validateLogStatement(String message)
{
- verifyConnection(_session.getSessionID(), "MockProtocolSessionUser", "null", "test", message);
+ verifyConnection(_session.getSessionID(), "InternalTestProtocolSession", "127.0.0.1:1", "test", message);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
new file mode 100644
index 0000000000..624421f447
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+public class MessageStoreLogSubjectTest extends AbstractTestLogSubject
+{
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _subject = new MessagesStoreLogSubject(_testVhost, _testVhost.getMessageStore());
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/ms(MemoryMessageStore)] <Log Message>
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ verifyVirtualHost(message, _testVhost);
+
+ String msSlice = getSlice("ms", message);
+
+ assertNotNull("MessageStore not found:" + message, msSlice);
+
+ assertEquals("MessageStore not correct",
+ _testVhost.getMessageStore().getClass().getSimpleName(),
+ msSlice);
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
index 0b0b0d78d1..e217497b7b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
@@ -20,17 +20,13 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -57,31 +53,13 @@ public class SubscriptionLogSubjectTest extends AbstractTestLogSubject
_queue = new MockAMQQueue("QueueLogSubjectTest");
((MockAMQQueue) _queue).setVirtualHost(_testVhost);
- // Create a single session for this test.
- // Re-use is ok as we are testing the LogActor object is set correctly,
- // not the value of the output.
- AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
- // Use the first Virtualhost that has been defined to initialise
- // the MockProtocolSession. This prevents a NPE when the
- // AMQPActor attempts to lookup the name of the VHost.
- try
- {
- session.setVirtualHost(ApplicationRegistry.getInstance().
- getVirtualHostRegistry().getVirtualHosts().
- toArray(new VirtualHost[1])[0]);
- }
- catch (AMQException e)
- {
- fail("Unable to set virtualhost on session:" + e.getMessage());
- }
+ AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore());
- AMQChannel channel = new AMQChannel(session, _channelID, session.getVirtualHost().getMessageStore());
-
- session.addChannel(channel);
+ _session.addChannel(channel);
SubscriptionFactory factory = new SubscriptionFactoryImpl();
- _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"),
+ _subscription = factory.createSubscription(_channelID, _session, new AMQShortString("cTag"),
_acks, _filters, _noLocal,
new LimitlessCreditManager());
@@ -102,13 +80,13 @@ public class SubscriptionLogSubjectTest extends AbstractTestLogSubject
String subscriptionSlice = getSlice("sub:"
+ _subscription.getSubscriptionID(),
message);
-
+
assertNotNull("Unable to locate subscription 'sub:" +
_subscription.getSubscriptionID() + "'");
// Adding the ')' is a bit ugly but SubscriptionLogSubject is the only
// Subject that nests () and so the simple parser of checking for the
// next ')' falls down.
- verifyQueue(subscriptionSlice+")", _queue);
+ verifyQueue(subscriptionSlice + ")", _queue);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index f09b03ab85..6050512679 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -111,8 +111,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_protocolSession =
- new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
- null);
+ new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true));
// Need to authenticate session for it to work, (well for logging to work)
_protocolSession.setAuthorizedID(new Principal()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 49c5f8a14b..37dfead2e5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,7 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.net.SocketAddress;
+import java.security.Principal;
public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
{
@@ -42,7 +42,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
- public InternalTestProtocolSession() throws AMQException
+ public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
super(new TestIoSession(),
ApplicationRegistry.getInstance().getVirtualHostRegistry(),
@@ -50,6 +50,16 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+ // Need to authenticate session for it to be representative testing.
+ setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "InternalTestProtocolSession";
+ }
+ });
+
+ setVirtualHost(virtualHost);
}
public ProtocolOutputConverter getProtocolOutputConverter()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index 9597c1319a..7e16737a83 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.AMQException;
@@ -41,7 +42,10 @@ public class MaxChannelsTest extends TestCase
public void testChannels() throws Exception
{
_session = new AMQMinaProtocolSession(new TestIoSession(), _appRegistry
- .getVirtualHostRegistry(), new AMQCodecFactory(true), null);
+ .getVirtualHostRegistry(), new AMQCodecFactory(true));
+
+ // Set the current Actor for these tests
+ CurrentActor.set(_session.getLogActor());
// Need to authenticate session for it to work, (well for logging to work)
_session.setAuthorizedID(new Principal()
@@ -92,6 +96,11 @@ public class MaxChannelsTest extends TestCase
// Yikes
fail(e.getMessage());
}
+ finally
+ {
+ //Remove the actor set during the test
+ CurrentActor.remove();
+ }
ApplicationRegistry.remove(1);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 6589f46c7b..db7312202b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
@@ -49,6 +50,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
import java.util.Set;
+import java.security.Principal;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -184,7 +186,6 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- _protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
_protocolSession.addChannel(channel);
@@ -295,12 +296,13 @@ public class AMQQueueAlertTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(1);
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _protocolSession = new InternalTestProtocolSession();
-
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ CurrentActor.set(_protocolSession.getLogActor());
}
protected void tearDown()
{
+ CurrentActor.remove();
ApplicationRegistry.remove(1);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 1138b465cd..1acf8a3c31 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -175,7 +175,8 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
+ InternalTestProtocolSession protocolSession = new InternalTestProtocolSession(_virtualHost);
+
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
@@ -372,7 +373,7 @@ public class AMQQueueMBeanTest extends TestCase
null);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new InternalTestProtocolSession();
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
}
public void tearDown()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 9c2932c5e2..d360904dd7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -29,6 +29,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
@@ -55,7 +58,7 @@ public class AckTest extends TestCase
private Subscription _subscription;
- private MockProtocolSession _protocolSession;
+ private AMQProtocolSession _protocolSession;
private TestMemoryMessageStore _messageStore;
@@ -66,19 +69,21 @@ public class AckTest extends TestCase
private AMQQueue _queue;
private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
+ private VirtualHost _virtualHost;
protected void setUp() throws Exception
{
super.setUp();
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
+ _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
_messageStore = new TestMemoryMessageStore();
- _protocolSession = new MockProtocolSession(_messageStore);
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, _virtualHost,
null);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
deleted file mode 100644
index fe79c40bb9..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.transport.Sender;
-
-import javax.security.sasl.SaslServer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.security.Principal;
-import java.net.SocketAddress;
-
-/**
- * A protocol session that can be used for testing purposes.
- */
-public class MockProtocolSession implements AMQProtocolSession
-{
- private MessageStore _messageStore;
-
- private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
-
- private static final AtomicLong idGenerator = new AtomicLong(0);
-
- private final long _sessionID = idGenerator.getAndIncrement();
- private VirtualHost _virtualHost;
-
- public MockProtocolSession(MessageStore messageStore)
- {
- _messageStore = messageStore;
- }
-
- public long getSessionID()
- {
- return _sessionID;
- }
-
- public void dataBlockReceived(AMQDataBlock message) throws Exception
- {
- }
-
- public void writeFrame(AMQDataBlock frame)
- {
- }
-
- public AMQShortString getContextKey()
- {
- return null;
- }
-
- public void setContextKey(AMQShortString contextKey)
- {
- }
-
- public AMQChannel getChannel(int channelId)
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new IllegalArgumentException("Invalid channel id: " + channelId);
- }
- else
- {
- return channel;
- }
- }
-
- public void addChannel(AMQChannel channel)
- {
- if (channel == null)
- {
- throw new IllegalArgumentException("Channel must not be null");
- }
- else
- {
- _channelMap.put(channel.getChannelId(), channel);
- }
- }
-
- public void closeChannel(int channelId) throws AMQException
- {
- }
-
- public void closeChannelOk(int channelId)
- {
-
- }
-
- public boolean channelAwaitingClosure(int channelId)
- {
- return false;
- }
-
- public void removeChannel(int channelId)
- {
- _channelMap.remove(channelId);
- }
-
- public void initHeartbeats(int delay)
- {
- }
-
- public void closeSession() throws AMQException
- {
- }
-
- public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException
- {
- }
-
- public Object getKey()
- {
- return null;
- }
-
- public String getLocalFQDN()
- {
- return null;
- }
-
- public SaslServer getSaslServer()
- {
- return null;
- }
-
- public void setSaslServer(SaslServer saslServer)
- {
- }
-
- public FieldTable getClientProperties()
- {
- return null;
- }
-
- public void setClientProperties(FieldTable clientProperties)
- {
- }
-
- public Object getClientIdentifier()
- {
- return null;
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void setVirtualHost(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public void addSessionCloseTask(Task task)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void removeSessionCloseTask(Task task)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public ProtocolOutputConverter getProtocolOutputConverter()
- {
- return ProtocolOutputConverterRegistry.getConverter(this);
- }
-
- public void setAuthorizedID(Principal authorizedID)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Principal getPrincipal()
- {
- return new Principal()
- {
- public String getName()
- {
- return "MockProtocolSessionUser";
- }
- };
-
- }
-
- public SocketAddress getRemoteAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public MethodRegistry getMethodRegistry()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void methodFrameReceived(int channelId, AMQMethodBody body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void contentHeaderReceived(int channelId, ContentHeaderBody body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void contentBodyReceived(int channelId, ContentBody body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public MethodDispatcher getMethodDispatcher()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public ProtocolSessionIdentifier getSessionIdentifier()
- {
- return null;
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolVersion().getMajorVersion();
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolVersion().getMinorVersion();
- }
-
-
- public ProtocolVersion getProtocolVersion()
- {
- return ProtocolVersion.getLatestSupportedVersion(); //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public VersionSpecificRegistry getRegistry()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setSender(Sender<java.nio.ByteBuffer> sender)
- {
- // FIXME AS TODO
-
- }
-
- public void init()
- {
- // TODO Auto-generated method stub
-
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
index 6c6835ccca..8262ca0e29 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
@@ -31,15 +31,15 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.configuration.SecurityConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugins.MockPluginManager;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class ACLManagerTest extends TestCase
{
@@ -64,8 +64,13 @@ public class ACLManagerTest extends TestCase
_pluginManager = new MockPluginManager("");
_authzManager = new ACLManager(_conf, _pluginManager);
-
- _session = new MockProtocolSession(new TestableMemoryMessageStore());
+
+
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().iterator().next();
+
+ // Create a single session for this test.
+ _session = new InternalTestProtocolSession(virtualHost);
}
public void testACLManagerConfigurationPluginManager() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 4317a501ed..30b571ef63 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -352,7 +352,7 @@ public class MessageStoreTest extends TestCase
messageInfo,
new NonTransactionalContext(_virtualHost.getMessageStore(),
new StoreContext(), null, null),
- new InternalTestProtocolSession());
+ new InternalTestProtocolSession(_virtualHost));
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 585ed9a538..b706ee51d8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -32,6 +32,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -79,17 +80,8 @@ public class InternalBrokerBaseCase extends TestCase
_queue.bind(defaultExchange, QUEUE_NAME, null);
- _session = new InternalTestProtocolSession();
-
- _session.setAuthorizedID(new Principal()
- {
- public String getName()
- {
- return "InternalBrokerBaseCaseUser";
- }
- });
-
- _session.setVirtualHost(_virtualHost);
+ _session = new InternalTestProtocolSession(_virtualHost);
+ CurrentActor.set(_session.getLogActor());
_channel = new MockChannel(_session, 1, _messageStore);
@@ -98,6 +90,7 @@ public class InternalBrokerBaseCase extends TestCase
public void tearDown() throws Exception
{
+ CurrentActor.remove();
ApplicationRegistry.remove(1);
super.tearDown();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 84bee7984b..4ecbffb4b4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -39,6 +39,8 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import java.util.Collection;
import java.util.HashMap;
@@ -73,6 +75,9 @@ public class TestApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
+ new Log4jMessageLogger());
+
Properties users = new Properties();
users.put("guest", "guest");
diff --git a/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm b/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm
index 917901a18b..10be2299e9 100644
--- a/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm
+++ b/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm
@@ -106,14 +106,26 @@ public class ${type.name}Messages
public static LogMessage ${message.methodName}(#foreach($parameter in ${message.parameters})${parameter.type} ${parameter.name}#if (${velocityCount} != ${message.parameters.size()} ), #end
#end#if(${message.parameters.size()} > 0 && ${message.options.size()} > 0), #end#foreach($option in ${message.options})boolean ${option.name}#if (${velocityCount} != ${message.options.size()} ), #end#end)
{
+##
+## If we don't have any parameters then we don't need the overhead of using the
+## message formatter so we can just set our return message to the retreived
+## fixed string.
+## So we don't need to update the _formatter with the new pattern.
+##
+## Here we setup rawMessage to be the formatted message ready for direct return
+## with the message.name or further processing to remove options.
+##
+#if(${message.parameters.size()} > 0)
final Object[] messageArguments = {#foreach($parameter in ${message.parameters})${parameter.name}#if (${velocityCount} != ${message.parameters.size()} ), #end#end};
_formatter.applyPattern(_messages.getString("${message.name}"));
-
- ## If we have some options then we need to build the message based
- ## on those values so only provide that logic if we need it.
-#if(${message.options.size()} > 0)
String rawMessage = _formatter.format(messageArguments);
+#else
+ String rawMessage = _messages.getString("${message.name}");
+#end
+## If we have some options then we need to build the message based
+## on those values so only provide that logic if we need it.
+#if(${message.options.size()} > 0)
StringBuffer msg =new StringBuffer("${message.name} : ");
// Split the formatted message up on the option values so we can
@@ -124,8 +136,8 @@ public class ${type.name}Messages
int end;
if (parts.length > 1)
{
- ## For Each Optional value check if it has been enabled and then
- ## append it to the log.
+## For Each Optional value check if it has been enabled and then
+## append it to the log.
#foreach($option in ${message.options})
// Add Option : ${option.value}
@@ -142,8 +154,8 @@ public class ${type.name}Messages
final String message = msg.toString();
#else
- ## If we have no options then we can just format and set the log
- final String message = "${message.name} : " + _formatter.format(messageArguments);
+## If we have no options then we can just format and set the log
+ final String message = "${message.name} : " + rawMessage;
#end
return new LogMessage()
diff --git a/qpid/java/common/templates/model/ProtocolVersionListClass.vm b/qpid/java/common/templates/model/ProtocolVersionListClass.vm
index 9ac6adfdf5..d3e943e36f 100644
--- a/qpid/java/common/templates/model/ProtocolVersionListClass.vm
+++ b/qpid/java/common/templates/model/ProtocolVersionListClass.vm
@@ -41,12 +41,14 @@ public class ProtocolVersion implements Comparable
{
private final byte _majorVersion;
private final byte _minorVersion;
+ private final String _stringFormat;
public ProtocolVersion(byte majorVersion, byte minorVersion)
{
_majorVersion = majorVersion;
_minorVersion = minorVersion;
+ _stringFormat = _majorVersion+"-"+_minorVersion;
}
public byte getMajorVersion()
@@ -59,6 +61,11 @@ public class ProtocolVersion implements Comparable
return _minorVersion;
}
+ public String toString()
+ {
+ return _stringFormat;
+ }
+
public int compareTo(Object o)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java
deleted file mode 100644
index 205a741b2f..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Logger;
-import org.apache.log4j.SimpleLayout;
-import org.apache.qpid.server.store.DerbyMessageStore;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AlertingTest extends QpidTestCase
-{
- private String VIRTUALHOST = "test";
- private Session _session;
- private Connection _connection;
- private Queue _destination;
- private MessageConsumer _consumer; // Never read, but does need to be here to create the destination.
- private File _logfile;
- private XMLConfiguration _configuration;
- private int _numMessages;
-
- public void setUp() throws Exception
- {
- // First we munge the config file and, if we're in a VM, set up an additional logfile
-
- _configuration = new XMLConfiguration(_configFile);
- _configuration.setProperty("management.enabled", "false");
- Class storeClass = DerbyMessageStore.class;
- Class bdb = null;
- try {
- bdb = Class.forName("org.apache.qpid.store.berkleydb.BDBMessageStore");
- }
- catch (ClassNotFoundException e)
- {
- // No BDB store, we'll use Derby instead.
- }
- if (bdb != null)
- {
- storeClass = bdb;
- }
-
- _configuration.setProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".store.class", storeClass.getName());
- _numMessages = 50;
-
- File tmpFile = File.createTempFile("configFile", "test");
- tmpFile.deleteOnExit();
- _configuration.save(tmpFile);
- _configFile = tmpFile;
-
-
- if (_outputFile != null)
- {
- _logfile = _outputFile;
- }
- else
- {
- // This is mostly for running the test outside of the ant setup
- _logfile = File.createTempFile("logFile", "test");
- FileAppender appender = new FileAppender(new SimpleLayout(), _logfile.getAbsolutePath());
- appender.setFile(_logfile.getAbsolutePath());
- appender.setImmediateFlush(true);
- Logger.getRootLogger().addAppender(appender);
- //_logfile.deleteOnExit();
- }
-
- // Then we do the normal setup stuff like starting the broker, getting a connection etc.
-
- super.setUp();
-
- _connection = getConnection();
- _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- _destination = _session.createQueue("testQueue");
-
- // Consumer is only used to actually create the destination
- _consumer = _session.createConsumer(_destination);
- }
-
- /**
- * Checks the log file for MESSAGE_COUNT_ALERT, fails() the test if it's not found and
- * places the entire contents in the message to help debug cruise control failures.
- * @throws Exception
- */
- private void wasAlertFired() throws Exception
- {
- // Loop through alerts until we're done or 5 seconds have passed,
- // just in case the logfile takes a while to flush.
- BufferedReader reader = new BufferedReader(new FileReader(_logfile));
- boolean found = false;
- long endtime = System.currentTimeMillis()+5000;
- while (!found && System.currentTimeMillis() < endtime)
- {
- while (reader.ready())
- {
- String line = reader.readLine();
- if (line.contains("MESSAGE_COUNT_ALERT"))
- {
- found = true;
- }
- }
- }
- if (!found)
- {
- StringBuffer message = new StringBuffer("Could not find alert in log file: "+_logfile.getAbsolutePath());
- message.append("\n");
- reader = new BufferedReader(new FileReader(_logfile));
- for (int i = 0; i < 79; i++) { message.append("-"); };
- message.append("\n");
- while (reader.ready()) { message.append(reader.readLine() + "\n");}
- message.append("\n");
- for (int i = 0; i < 79; i++) { message.append("-"); };
- message.append("\n");
- fail(message.toString());
- }
- }
-
- public void testAlertingReallyWorks() throws Exception
- {
- // Send 5 messages, make sure that the alert was fired properly.
- sendMessage(_session, _destination, _numMessages + 1);
- _session.commit();
- wasAlertFired();
- }
-
- public void testAlertingReallyWorksWithRestart() throws Exception
- {
- sendMessage(_session, _destination, _numMessages + 1);
- _session.commit();
- stopBroker();
- (new FileOutputStream(_logfile)).getChannel().truncate(0);
- startBroker();
- wasAlertFired();
- }
-
- public void testAlertingReallyWorksWithChanges() throws Exception
- {
- // send some messages and nuke the logs
- sendMessage(_session, _destination, 2);
- _session.commit();
- stopBroker();
- (new FileOutputStream(_logfile)).getChannel().truncate(0);
-
- // Change max message count to 5, start broker and make sure that that's triggered at the right time
- _configuration.setProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".queues.maximumMessageCount", 5);
- startBroker();
- _connection = getConnection();
- _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- // Trigger the new value
- sendMessage(_session, _destination, 3);
- _session.commit();
- wasAlertFired();
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
new file mode 100644
index 0000000000..47ae599d2b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.LogMonitor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+public class AbstractTestLogging extends QpidTestCase
+{
+ protected LogMonitor _monitor;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _monitor = new LogMonitor(_outputFile);
+ }
+
+ /**
+ * assert that the requested log message has not occured
+ *
+ * @param log
+ *
+ * @throws IOException
+ */
+ public void assertLoggingNotYetOccured(String log) throws IOException
+ {
+ // Ensure the alert has not occured yet
+ assertEquals("Message has already occured:" + log, 0,
+ _monitor.findMatches(log).size());
+ }
+
+
+ protected void validateMessageID(String id, String log)
+ {
+ assertEquals("Incorrect message",id, getMessageID(log));
+ }
+
+ protected String getMessageID(String log)
+ {
+ String message = fromMessage(log);
+
+ return message.substring(0, message.indexOf(" "));
+ }
+
+ /**
+ * Return the first channel id from the log string
+ * ' ch;X' if there is no channel id return -1.
+ *
+ * @param log the log string to search.
+ *
+ * @return channel id or -1 if no channel id exists.
+ */
+ protected int getChannelID(String log)
+ {
+ int start = log.indexOf("ch:") + 3;
+
+ // If we do a check for ] as the boundary we will get cases where log
+ // is presented with the bounding. If we don't match a ] then we can use
+ // the end of the string as the boundary.
+ int end = log.indexOf("]", start);
+ if (end == -1)
+ {
+ end = log.length();
+ }
+
+ try
+ {
+ return Integer.parseInt(log.substring(start, end));
+ }
+ catch (Exception e)
+ {
+ return -1;
+ }
+ }
+
+ protected String fromMessage(String log)
+ {
+ int startSubject = log.indexOf("]") + 1;
+ int start = log.indexOf("]", startSubject) + 1;
+
+ // If we don't have a subject then the second indexOf will return 0
+ // in which case we can use the end of the actor as the index.
+ if (start == 0)
+ {
+ start = startSubject;
+ }
+
+ return log.substring(start).trim();
+ }
+
+ /**
+ * Extract the Subject from the Log Message.
+ *
+ * The subject is the second block inclosed in brackets '[ ]'.
+ *
+ * If there is no Subject or the second block of brackets '[ ]' cannot be
+ * identified then an empty String ("") is returned.
+ *
+ * The brackets '[ ]' are not included in the returned String.
+ *
+ * @param log The log message to process
+ * @return the Subject string or the empty string ("") if the subject can't be identified.
+ */
+ protected String fromSubject(String log)
+ {
+ int start = log.indexOf("[") + 1;
+ // Take the second index
+ start = log.indexOf("[", start) + 1;
+
+ // There may not be a subject so in that case return nothing.
+ if (start == 0)
+ {
+ return "";
+ }
+
+ int end = log.indexOf("]", start);
+ try
+ {
+ return log.substring(start, end);
+ }
+ catch (IndexOutOfBoundsException iobe)
+ {
+ return "";
+ }
+ }
+
+ /**
+ * Extract the actor segment from the log message.
+ * The Actor segment is the first section enclosed in '[ ]'.
+ *
+ * No analysis is performed to ensure that the first '[ ]' section of the
+ * given log is really an Actor segment.
+ *
+ * The brackets '[ ]' are not included in the returned String.
+ *
+ * @param log the Log Message
+ * @return the Actor segment or "" if unable to locate '[ ]' section
+ */
+ protected String fromActor(String log)
+ {
+ int start = log.indexOf("[") + 1;
+ int end = log.indexOf("]", start);
+ try
+ {
+ return log.substring(start, end).trim();
+ }
+ catch (IndexOutOfBoundsException iobe)
+ {
+ return "";
+ }
+ }
+
+ /**
+ * Given our log message extract the connection ID:
+ *
+ * The log string will contain the connectionID identified by 'con:'
+ *
+ * So extract the value shown here by X:
+ *
+ * 'con:X('
+ *
+ * Extract the value between the ':' and '(' and process it as an Integer
+ *
+ * If we are unable to find the right index or process the substring as an
+ * Integer then return -1.
+ *
+ * @param log the log String to process
+ * @return the connection ID or -1.
+ */
+ protected int extractConnectionID(String log)
+ {
+ int conIDStart = log.indexOf("con:") + 4;
+ int conIDEnd = log.indexOf("(", conIDStart);
+ try
+ {
+ return Integer.parseInt(log.substring(conIDStart, conIDEnd));
+ }
+ catch (Exception e)
+ {
+ return -1;
+ }
+ }
+
+ /**
+ * Extract the log entry from the raw log line which will contain other
+ * log4j formatting.
+ *
+ * This formatting may impead our testing process so extract the log message
+ * as we know it to be formatted.
+ *
+ * This starts with the string MESSAGE
+ * @param rawLog the raw log
+ * @return the log we are expecting to be printed without the log4j prefixes
+ */
+ protected String getLog(String rawLog)
+ {
+ int start = rawLog.indexOf("MESSAGE");
+ return rawLog.substring(start);
+ }
+
+ /**
+ * Given a list of messages that have been pulled out of a log file
+ * Process the results splitting the log statements in to lists based on the
+ * actor's connection ID.
+ *
+ * So for each log entry extract the Connecition ID from the Actor of the log
+ *
+ * Then use that as a key to a HashMap storing the list of log messages for
+ * that connection.
+ *
+ * @param logMessages The list of mixed connection log messages
+ * @return Map indexed by connection id to a list of log messages just for that connection.
+ */
+ protected HashMap<Integer,List<String>> splitResultsOnConnectionID(List<String> logMessages)
+ {
+ HashMap<Integer,List<String>> connectionSplitList = new HashMap<Integer, List<String>>();
+
+ for (String log : logMessages)
+ {
+ // Get the connectionID from the Actor in the Message Log.
+ int cID = extractConnectionID(fromActor(getLog(log)));
+
+ List<String> connectionData = connectionSplitList.get(cID);
+
+ // Create the initial List if we don't have one already
+ if (connectionData == null)
+ {
+ connectionData = new LinkedList<String>();
+ connectionSplitList.put(cID, connectionData);
+ }
+
+ // Store the log
+ connectionData.add(log);
+ }
+
+ return connectionSplitList;
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
new file mode 100644
index 0000000000..14eec8daff
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
@@ -0,0 +1,202 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.util.LogMonitor;
+
+import javax.jms.Connection;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+
+public class AlertingTest extends AbstractTestLogging
+{
+ private String VIRTUALHOST = "test";
+ private Session _session;
+ private Connection _connection;
+ private Queue _destination;
+ private int _numMessages;
+
+ private static final int ALERT_LOG_WAIT_PERIOD = 5000;
+ private static final String MESSAGE_COUNT_ALERT = "MESSAGE_COUNT_ALERT";
+
+ public void setUp() throws Exception
+ {
+ // set QPID_WORK to be [QPID_WORK|io.tmpdir]/<testName>
+ // This ensures that each of these tests operate independantly.
+ setSystemProperty("QPID_WORK",
+ System.getProperty("QPID_WORK",
+ System.getProperty("java.io.tmpdir"))
+ + File.separator + getName());
+
+ // Update the configuration to make our virtualhost Persistent.
+ makeVirtualHostPersistent(VIRTUALHOST);
+
+ _numMessages = 50;
+
+ // Then we do the normal setup stuff like starting the broker, getting a connection etc.
+ super.setUp();
+
+ setupConnection();
+ }
+
+ /**
+ * Create a new connection and ensure taht our destination queue is created
+ * and bound.
+ *
+ * Note that the tests here that restart the broker rely on persistence.
+ * However, the queue creation here is transient. So the queue will not be
+ * rebound on restart. Hence the consumer creation here rather than just the
+ * once.
+ *
+ * The persistent messages will recreate the queue but not bind it (as it
+ * was not a durable queue) However, the consumer creation here will ensure
+ * that the queue is correctly bound and can receive new messages.
+ *
+ * @throws Exception
+ */
+ private void setupConnection()
+ throws Exception
+ {
+ _connection = getConnection();
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _destination = _session.createQueue("testQueue");
+
+ // Consumer is only used to actually create the destination
+ _session.createConsumer(_destination).close();
+ }
+
+ /**
+ * Checks the log file for MESSAGE_COUNT_ALERT, fails() the test if it's not found and
+ * places the entire contents in the message to help debug cruise control failures.
+ *
+ * @throws Exception
+ */
+ private void wasAlertFired() throws Exception
+ {
+ if (!_monitor.waitForMessage(MESSAGE_COUNT_ALERT, ALERT_LOG_WAIT_PERIOD))
+ {
+ StringBuffer message = new StringBuffer("Could not find 'MESSAGE_COUNT_ALERT' in log file: " + _monitor.getMonitoredFile().getAbsolutePath());
+ message.append("\n");
+
+ // Add the current contents of the log file to test output
+ message.append(_monitor.readFile());
+
+ // Write the server config file to test output
+ message.append("Server configuration file in use:\n");
+ message.append(FileUtils.readFileAsString(_configFile));
+
+ // Write the virtualhost config file to test output
+ message.append("\nVirtualhost configuration file in use:\n");
+ message.append(FileUtils.readFileAsString(ServerConfiguration.
+ flatConfig(_configFile).getString("virtualhosts")));
+
+ fail(message.toString());
+ }
+ }
+
+ public void testAlertingReallyWorks() throws Exception
+ {
+ // Send 5 messages, make sure that the alert was fired properly.
+ sendMessage(_session, _destination, _numMessages + 1);
+ _session.commit();
+ wasAlertFired();
+ }
+
+ public void testAlertingReallyWorksWithRestart() throws Exception
+ {
+ sendMessage(_session, _destination, _numMessages + 1);
+ _session.commit();
+ stopBroker();
+
+ // Rest the monitoring clearing the current output file.
+ _monitor.reset();
+ startBroker();
+ wasAlertFired();
+ }
+
+ /**
+ * Test that if the alert value is change from the previous value we can
+ * still get alerts.
+ *
+ * Test sends two messages to the broker then restarts the broker with new
+ * configuration.
+ *
+ * If the test is running inVM the test validates that the new configuration
+ * has been applied.
+ *
+ * Validates that we only have two messages on the queue and then sends
+ * enough messages to trigger the alert.
+ *
+ * The alert is then validate.
+ *
+ *
+ * @throws Exception
+ */
+ public void testAlertingReallyWorksWithChanges() throws Exception
+ {
+ // send some messages and nuke the logs
+ sendMessage(_session, _destination, 2);
+ _session.commit();
+ // To prevent any failover/retry/connection dropped errors
+ _connection.close();
+
+ stopBroker();
+
+ _monitor.reset();
+
+ // Change max message count to 5, start broker and make sure that that's triggered at the right time
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".queues.maximumMessageCount", "5");
+
+ startBroker();
+
+ if (!isExternalBroker())
+ {
+ assertEquals("Alert Max Msg Count is not correct", 5, ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost(VIRTUALHOST).getQueueRegistry().getQueue(new AMQShortString(_destination.getQueueName())).
+ getMaximumMessageCount());
+ }
+
+ setupConnection();
+
+ // Validate the queue depth is as expected
+ long messageCount = ((AMQSession) _session).getQueueDepth((AMQDestination) _destination);
+ assertEquals("Broker has invalid message count for test", 2, messageCount);
+
+ // Ensure the alert has not occured yet
+ assertLoggingNotYetOccured(MESSAGE_COUNT_ALERT);
+
+ // Trigger the new value
+ sendMessage(_session, _destination, 3);
+ _session.commit();
+
+ // Validate that the alert occured.
+ wasAlertFired();
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
new file mode 100644
index 0000000000..2de6b08751
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+import java.util.List;
+
+public class ChannelLoggingTest extends AbstractTestLogging
+{
+ private static final String CHANNEL_PREFIX = "CHN-";
+
+ public void setUp() throws Exception
+ {
+ // set QPID_WORK to be [QPID_WORK|io.tmpdir]/<testName>
+ setSystemProperty("QPID_WORK",
+ System.getProperty("QPID_WORK",
+ System.getProperty("java.io.tmpdir"))
+ + File.separator + getName());
+
+ //Start the broker
+ super.setUp();
+ }
+
+ /**
+ * Description:
+ * When a new Channel (JMS Session) is created this will be logged as a CHN-1001 Create message. The messages will contain the prefetch details about this new Channel.
+ * Input:
+ *
+ * 1. Running Broker
+ * 2. New JMS Session/Channel creation
+ *
+ * Output:
+ * <date> CHN-1001 : Create : Prefetch <count>
+ *
+ * Validation Steps:
+ * 3. The CHN ID is correct
+ * 4. The prefetch value matches that defined by the requesting client.
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testChannelCreate() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Test that calling session.close gives us the expected output
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // Validation
+
+ assertEquals("CHN messages not logged", 1, results.size());
+
+ String log = getLog(results.get(0));
+ // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create
+ //1 & 2
+ validateMessageID("CHN-1001", log);
+ assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log)));
+
+ connection.close();
+ }
+
+ /**
+ * Description:
+ * The Java Broker implements consumer flow control for all ack modes except
+ * No-Ack. When a client connects the session's flow is initially set to
+ * Stopped. Verify this message appears
+ *
+ * Input:
+ * 1. Running broker
+ * 2. Create consumer
+ * Output:
+ *
+ * <date> CHN-1002 : Flow Stopped
+ *
+ * Validation Steps:
+ * 4. The CHN ID is correct
+ *
+ * @throws Exception - if an error occurs
+ */
+
+ public void testChannelStartsFlowStopped() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session to fill up
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = (Queue) getInitialContext().lookup(QUEUE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last channel message should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow Off
+
+ // Verify
+ int resultSize = results.size();
+ String log = getLog(results.get(resultSize - 1));
+
+ validateMessageID("CHN-1002", log);
+ assertTrue("Message should be Flow Stopped", fromMessage(log).endsWith("Flow Stopped"));
+
+ }
+
+ /**
+ * Description:
+ * The Java Broker implements consumer flow control for all ack modes except
+ * No-Ack. When the client first attempts to receive a message then the Flow
+ * status of the Session is set to Started.
+ *
+ * Input:
+ * 1. Running broker
+ * 2. Create a consumer
+ * 3. Attempt to receive a message
+ * Output:
+ *
+ * <date> CHN-1002 : Flow Started
+ *
+ * Validation Steps:
+ * 4. The CHN ID is correct
+ *
+ * @throws Exception - if an error occurs
+ */
+
+ public void testChannelStartConsumerFlowStarted() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session to fill up
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = (Queue) getInitialContext().lookup(QUEUE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ //Call receive to send the Flow On message
+ consumer.receiveNoWait();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last two channel messages should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On
+
+ // Verify
+
+ int resultSize = results.size();
+ String log = getLog(results.get(resultSize - 1));
+
+ validateMessageID("CHN-1002", log);
+ assertTrue("Message should be Flow Started", fromMessage(log).endsWith("Flow Started"));
+
+ }
+
+ /**
+ * Description:
+ * When the client gracefully closes the Connection then a CHN-1003 Close
+ * message will be issued. This must be the last message logged for this
+ * Channel.
+ * Input:
+ * 1. Running Broker
+ * 2. Connected Client
+ * 3. Client then requests that the Connection is closed
+ * Output:
+ *
+ * <date> CHN-1003 : Close
+ *
+ * Validation Steps:
+ * 4. The MST ID is correct
+ * 5. This must be the last message logged for this Channel.
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testChannelCloseViaConnectionClose() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Close the connection to verify the created session closing is logged.
+ connection.close();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last two channel messages should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On
+
+ // Verify
+
+ int resultSize = results.size();
+ String log = getLog(results.get(resultSize - 1));
+
+ validateMessageID("CHN-1003", log);
+ assertTrue("Message should be Close:" + fromMessage(log), fromMessage(log).endsWith("Close"));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log)));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log)));
+ }
+
+ /**
+ * Description:
+ * When the client gracefully closes the Connection then a CHN-1003 Close
+ * message will be issued. This must be the last message logged for this
+ * Channel.
+ * Input:
+ * 1. Running Broker
+ * 2. Connected Client
+ * 3. Client then requests that the Channel is closed
+ * Output:
+ *
+ * <date> CHN-1003 : Close
+ *
+ * Validation Steps:
+ * 4. The MST ID is correct
+ * 5. This must be the last message logged for this Channel.
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testChannelCloseViaChannelClose() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session and then close it
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last two channel messages should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On
+
+ // Verify
+
+ int resultSize = results.size();
+ String log = getLog(results.get(resultSize - 1));
+
+ validateMessageID("CHN-1003", log);
+ assertTrue("Message should be Close:" + fromMessage(getLog(log)), fromMessage(log).endsWith("Close"));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log)));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log)));
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
new file mode 100644
index 0000000000..46f32b1414
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
@@ -0,0 +1,185 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.test.unit.client.forwardall.Client;
+
+import javax.jms.Connection;
+import java.io.File;
+import java.util.List;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.SortedSet;
+import java.util.Collections;
+import java.util.TreeSet;
+
+public class ConnectionLoggingTest extends AbstractTestLogging
+{
+ private static final String CONNECTION_PREFIX = "CON-";
+
+ public void setUp() throws Exception
+ {
+ // set QPID_WORK to be [QPID_WORK|io.tmpdir]/<testName>
+ setSystemProperty("QPID_WORK",
+ System.getProperty("QPID_WORK",
+ System.getProperty("java.io.tmpdir"))
+ + File.separator + getName());
+
+ //Start the broker
+ super.setUp();
+ }
+
+ /**
+ * Description:
+ * When a new connection is made to the broker this must be logged.
+ *
+ * Input:
+ * 1. Running Broker
+ * 2. Connecting client
+ * Output:
+ * <date> CON-1001 : Open : Client ID {0}[ : Protocol Version : {1}] <version>
+ *
+ * Validation Steps:
+ * 1. The CON ID is correct
+ * 2. This is the first CON message for that Connection
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testConnectionOpen() throws Exception
+ {
+ assertLoggingNotYetOccured(CONNECTION_PREFIX);
+
+ Connection connection = getConnection();
+
+ List<String> results = _monitor.findMatches(CONNECTION_PREFIX);
+
+ // Validation
+ // We should have at least three messages when running InVM but when running External
+ // we will get 0-10 negotiation on con:0 whcih may close at some random point
+ // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open
+ // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open : Protocol Version : 0-10
+ // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open
+ // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Protocol Version : 0-9
+ // MESSAGE [con:0(/127.0.0.1:46926)] CON-1002 : Close
+ // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
+
+ //So check how many connections we have in the result set and extract the last one.
+ // When running InVM we will have con:0 and externally con:1
+
+ HashMap<Integer, List<String>> connectionData = splitResultsOnConnectionID(results);
+
+ // Get the last Integer from keySet of the ConnectionData
+ int connectionID = new TreeSet<Integer>(connectionData.keySet()).last();
+
+ //Use just the data from the last connection for the test
+ results = connectionData.get(connectionID);
+
+ // If we are running inVM we will get three open messagse, if running externally weN will also have
+ // open and close messages from the failed 0-10 negotiation
+ assertTrue("CON messages not logged:" + results.size(), results.size() >= 3);
+
+ String log = getLog(results.get(0));
+ // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open
+ //1 & 2
+ validateMessageID("CON-1001",log);
+
+ //We get the size so that we can validate the last three CON- messages
+ int resultsSize = results.size();
+ // This is because when running externally we will also have logged the failed
+ // 0-10 negotiation messages
+
+ // 3 - Assert the options are correct
+ log = getLog(results.get(resultsSize - 1));
+ // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
+ validateMessageID("CON-1001",log);
+ assertTrue("Client ID option is not present", fromMessage(log).contains("Client ID :"));
+ assertTrue("Client ID value is not present", fromMessage(log).contains(connection.getClientID()));
+
+ assertTrue("Protocol Version option is not present", fromMessage(log).contains("Protocol Version :"));
+ //fixme there is no way currently to find out the negotiated protocol version
+ // The delegate is the versioned class ((AMQConnection)connection)._delegate
+
+ log = getLog(results.get(resultsSize - 2));
+ // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Protocol Version : 0-9
+ validateMessageID("CON-1001",log);
+ assertTrue("Protocol Version option is not present", fromMessage(log).contains("Protocol Version :"));
+ //fixme agani we should check the version
+ // Check that client ID is not present in log
+ assertTrue("Client ID option is present", !fromMessage(log).contains("Client ID :"));
+
+ log = getLog(results.get(resultsSize - 3));
+ validateMessageID("CON-1001",log);
+ // Check that PV is not present in log
+ assertTrue("Protocol Version option is present", !fromMessage(log).contains("Protocol Version :"));
+ // Check that client ID is not present in log
+ assertTrue("Client ID option is present", !fromMessage(log).contains("Client ID :"));
+
+ connection.close();
+ }
+
+ /**
+ * Description:
+ * When a connected client closes the connection this will be logged as a CON-1002 message.
+ * Input:
+ *
+ * 1. Running Broker
+ * 2. Connected Client
+ * Output:
+ *
+ * <date> CON-1002 : Close
+ *
+ * Validation Steps:
+ * 3. The CON ID is correct
+ * 4. This must be the last CON message for the Connection
+ * 5. It must be preceded by a CON-1001 for this Connection
+ */
+ public void testConnectionClose() throws Exception
+ {
+ assertLoggingNotYetOccured(CONNECTION_PREFIX);
+
+ // Open and then close the conneciton
+ getConnection().close();
+
+ List<String> results = _monitor.findMatches(CONNECTION_PREFIX);
+
+ // Validation
+
+ // We should have at least four messages
+ assertTrue("CON messages not logged:" + results.size(), results.size() >= 4);
+
+ //We get the size so that we can validate the last set of CON- messages
+ int resultsSize = results.size();
+
+ // Validate Close message occurs
+ String log = getLog(results.get(resultsSize - 1));
+ validateMessageID("CON-1002",log);
+ assertTrue("Message does not end with close:" + log, log.endsWith("Close"));
+
+ // Extract connection ID to validate there is a CON-1001 messasge for it
+ int connectionID = extractConnectionID(log);
+
+ //Previous log message should be the open
+ log = getLog(results.get(resultsSize - 2));
+ // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
+ validateMessageID("CON-1001",log);
+ assertEquals("Connection IDs do not match", connectionID, extractConnectionID(fromActor(log)));
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 91732bc010..c182db5d78 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -28,19 +28,31 @@ import javax.jms.Message;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
-import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.net.MalformedURLException;
+
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.jms.BrokerDetails;
@@ -168,6 +180,7 @@ public class QpidTestCase extends TestCase
// the connections created for a given test
protected List<Connection> _connections = new ArrayList<Connection>();
+ public static final String QUEUE = "queue";
public QpidTestCase(String name)
{
@@ -238,7 +251,8 @@ public class QpidTestCase extends TestCase
public void run(TestResult testResult)
{
- if (_exclusionList != null && (_exclusionList.contains(getClass().getName() + "#*") ||
+ if (_exclusionList != null && (_exclusionList.contains(getClass().getPackage().getName() + ".*") ||
+ _exclusionList.contains(getClass().getName() + "#*") ||
_exclusionList.contains(getClass().getName() + "#" + getName())))
{
_logger.info("Test: " + getName() + " is excluded");
@@ -490,6 +504,119 @@ public class QpidTestCase extends TestCase
}
}
+
+ /**
+ * Attempt to set the Java Broker to use the BDBMessageStore for persistence
+ * Falling back to the DerbyMessageStore if
+ *
+ * @param virtualhost - The virtualhost to modify
+ *
+ * @throws ConfigurationException - when reading/writing existing configuration
+ * @throws IOException - When creating a temporary file.
+ */
+ protected void makeVirtualHostPersistent(String virtualhost)
+ throws ConfigurationException, IOException
+ {
+ Class storeClass = DerbyMessageStore.class;
+
+ Class bdb = null;
+ try
+ {
+ bdb = Class.forName("org.apache.qpid.store.berkleydb.BDBMessageStore");
+ }
+ catch (ClassNotFoundException e)
+ {
+ // No BDB store, we'll use Derby instead.
+ }
+
+ if (bdb != null)
+ {
+ storeClass = bdb;
+ }
+
+ // First we munge the config file and, if we're in a VM, set up an additional logfile
+ XMLConfiguration configuration = new XMLConfiguration(_configFile);
+ configuration.setProperty("virtualhosts.virtualhost." + virtualhost +
+ ".store.class", storeClass.getName());
+ configuration.setProperty("virtualhosts.virtualhost." + virtualhost +
+ ".store." + DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
+ "${work}");
+
+ File tmpFile = File.createTempFile("configFile", "test");
+ tmpFile.deleteOnExit();
+ configuration.save(tmpFile);
+ _configFile = tmpFile;
+ }
+
+ /**
+ * Set a configuration Property for this test run.
+ *
+ * This creates a new configuration based on the current configuration
+ * with the specified property change.
+ *
+ * Multiple calls to this method will result in multiple temporary
+ * configuration files being created.
+ *
+ * @param property the configuration property to set
+ * @param value the new value
+ * @throws ConfigurationException when loading the current config file
+ * @throws IOException when writing the new config file
+ */
+ protected void setConfigurationProperty(String property, String value)
+ throws ConfigurationException, IOException
+ {
+ XMLConfiguration configuration = new XMLConfiguration(_configFile);
+
+ // If we are modifying a virtualhost value then we need to do so in
+ // the virtualhost.xml file as these values overwrite the values in
+ // the main config.xml file
+ if (property.startsWith("virtualhosts"))
+ {
+ // So locate the virtualhost.xml file and use the ServerConfiguration
+ // flatConfig method to get the interpolated value.
+ String vhostConfigFile = ServerConfiguration.
+ flatConfig(_configFile).getString("virtualhosts");
+
+ // Load the vhostConfigFile
+ XMLConfiguration vhostConfiguration = new XMLConfiguration(vhostConfigFile);
+
+ // Set the value specified in to the vhostConfig.
+ // Remembering that property will be 'virtualhosts.virtulhost....'
+ // so we need to take off the 'virtualhosts.' from the start.
+ vhostConfiguration.setProperty(property.substring(property.indexOf(".") + 1), value);
+
+ // Write out the new virtualhost config file
+ File tmpFile = File.createTempFile("virtualhost-configFile", ".xml");
+ tmpFile.deleteOnExit();
+ vhostConfiguration.save(tmpFile);
+
+ // Change the property and value to be the new virtualhosts file
+ // so that then update the value in the main config file.
+ property = "virtualhosts";
+ value = tmpFile.getAbsolutePath();
+ }
+
+ configuration.setProperty(property, value);
+
+ // Write the new server config file
+ File tmpFile = File.createTempFile("configFile", ".xml");
+ tmpFile.deleteOnExit();
+ configuration.save(tmpFile);
+
+ _logger.info("Qpid Test Case now using configuration File:"
+ + tmpFile.getAbsolutePath());
+
+ _configFile = tmpFile;
+ }
+
+ /**
+ * Set a System property for the duration of this test.
+ *
+ * When the test run is complete the value will be reverted.
+
+ * @param property the property to set
+ * @param value the new value to use
+ */
protected void setSystemProperty(String property, String value)
{
if (!_setProperties.containsKey(property))
@@ -531,6 +658,21 @@ public class QpidTestCase extends TestCase
return _brokerVersion.equals(VERSION_010);
}
+ protected boolean isJavaBroker()
+ {
+ return _brokerLanguage.equals("java");
+ }
+
+ protected boolean isCppBroker()
+ {
+ return _brokerLanguage.equals("cpp");
+ }
+
+ protected boolean isExternalBroker()
+ {
+ return !_broker.equals("vm");
+ }
+
public void restartBroker() throws Exception
{
restartBroker(0);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
new file mode 100644
index 0000000000..84010453e1
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.List;
+
+/**
+ * Utility to simplify the monitoring of Log4j file output
+ *
+ * Monitoring of a given log file can be done alternatively the Monitor will
+ * add a new log4j FileAppender to the root Logger to gather all the available
+ * logging for monitoring
+ */
+public class LogMonitor
+{
+ // The file that the log statements will be written to.
+ private File _logfile;
+
+ /**
+ * Create a new LogMonitor that creates a new Log4j Appender and monitors
+ * all log4j output via the current configuration.
+ *
+ * @throws IOException if there is a problem creating the temporary file.
+ */
+ public LogMonitor() throws IOException
+ {
+ this(null);
+ }
+
+ /**
+ * Create a new LogMonitor on the specified file if the file does not exist
+ * or the value is null then a new Log4j appender will be added and
+ * monitoring set up on that appender.
+ *
+ * NOTE: for the appender to receive any value the RootLogger will need to
+ * have the level correctly configured.ng
+ *
+ * @param file the file to monitor
+ *
+ * @throws IOException if there is a problem creating a temporary file
+ */
+ public LogMonitor(File file) throws IOException
+ {
+ if (file != null && file.exists())
+ {
+ _logfile = file;
+ }
+ else
+ {
+ // This is mostly for running the test outside of the ant setup
+ _logfile = File.createTempFile("LogMonitor", ".log");
+ FileAppender appender = new FileAppender(new SimpleLayout(),
+ _logfile.getAbsolutePath());
+ appender.setFile(_logfile.getAbsolutePath());
+ appender.setImmediateFlush(true);
+ Logger.getRootLogger().addAppender(appender);
+ }
+ }
+
+ /**
+ * Checks the log for instances of the search string.
+ *
+ * The pattern parameter can take any valid argument used in String.contains()
+ *
+ * {@see String.contains(CharSequences)}
+ *
+ * @param pattern the search string
+ *
+ * @return a list of matching lines from the log
+ *
+ * @throws IOException if there is a problem with the file
+ */
+ public List<String> findMatches(String pattern) throws IOException
+ {
+ return FileUtils.searchFile(_logfile, pattern);
+ }
+
+ /**
+ * Checks the log file for a given message to appear.
+ *
+ * @param message the message to wait for in the log
+ * @param wait the time in ms to wait for the message to occur
+ *
+ * @return true if the message was found
+ *
+ * @throws java.io.FileNotFoundException if the Log file can nolonger be found
+ * @throws IOException thrown when reading the log file
+ */
+ public boolean waitForMessage(String message, long wait)
+ throws FileNotFoundException, IOException
+ {
+ // Loop through alerts until we're done or wait ms seconds have passed,
+ // just in case the logfile takes a while to flush.
+ BufferedReader reader = new BufferedReader(new FileReader(_logfile));
+ boolean found = false;
+ long endtime = System.currentTimeMillis() + wait;
+ while (!found && System.currentTimeMillis() < endtime)
+ {
+ while (reader.ready())
+ {
+ String line = reader.readLine();
+ if (line.contains(message))
+ {
+ found = true;
+ }
+ }
+ }
+
+ return found;
+ }
+
+ /**
+ * Read the log file in to memory as a String
+ *
+ * @return the current contents of the log file
+ *
+ * @throws java.io.FileNotFoundException if the Log file can nolonger be found
+ * @throws IOException thrown when reading the log file
+ */
+ public String readFile() throws FileNotFoundException, IOException
+ {
+ return FileUtils.readFileAsString(_logfile);
+ }
+
+ /**
+ * Return a File reference to the monitored file
+ *
+ * @return the file being monitored
+ */
+ public File getMonitoredFile()
+ {
+ return _logfile;
+ }
+
+ /**
+ * Clears the log file and writes: 'Log Monitor Reset' at the start of the file
+ *
+ * @throws java.io.FileNotFoundException if the Log file can nolonger be found
+ * @throws IOException thrown if there is a problem with the log file
+ */
+ public void reset() throws FileNotFoundException, IOException
+ {
+ OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(_logfile));
+ writer.write("Log Monitor Reset\n");
+ writer.close();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
new file mode 100644
index 0000000000..f4dade5660
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
@@ -0,0 +1,302 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class LogMonitorTest extends TestCase
+{
+
+ /**
+ * Test that a new file is created when attempting to set up a monitor with
+ * the default constructor.
+ */
+ public void testMonitor()
+ {
+ // Validate that a NPE is thrown with null input
+ try
+ {
+ LogMonitor montior = new LogMonitor();
+ //Validte that the monitor is now running on a new file
+ assertTrue("New file does not have correct name:" + montior.
+ getMonitoredFile().getName(),
+ montior.getMonitoredFile().getName().contains("LogMonitor"));
+ }
+ catch (IOException ioe)
+ {
+ fail("IOE thrown:" + ioe);
+ }
+ }
+
+ /**
+ * Test that creation of a monitor on an existing file is possible
+ *
+ * This also tests taht getMonitoredFile works
+ *
+ * @throws IOException if there is a problem creating the temporary file
+ */
+ public void testMonitorNormalFile() throws IOException
+ {
+ File testFile = File.createTempFile("testMonitorFile", ".log");
+ testFile.deleteOnExit();
+
+ LogMonitor monitor;
+
+ //Ensure that we can create a monitor on a file
+ try
+ {
+ monitor = new LogMonitor(testFile);
+ assertEquals(testFile, monitor.getMonitoredFile());
+ }
+ catch (IOException ioe)
+ {
+ fail("IOE thrown:" + ioe);
+ }
+
+ }
+
+ /**
+ * Test that a new file is created when attempting to set up a monitor on
+ * a null input value.
+ */
+ public void testMonitorNullFile()
+ {
+ // Validate that a NPE is thrown with null input
+ try
+ {
+ LogMonitor montior = new LogMonitor(null);
+ //Validte that the monitor is now running on a new file
+ assertTrue("New file does not have correct name:" + montior.
+ getMonitoredFile().getName(),
+ montior.getMonitoredFile().getName().contains("LogMonitor"));
+ }
+ catch (IOException ioe)
+ {
+ fail("IOE thrown:" + ioe);
+ }
+ }
+
+ /**
+ * Test that a new file is created when attempting to set up a monitor on
+ * a non existing file.
+ *
+ * @throws IOException if there is a problem setting up the nonexistent file
+ */
+ public void testMonitorNonExistentFile() throws IOException
+ {
+ //Validate that we get a FileNotFound if the file does not exist
+
+ File nonexist = File.createTempFile("nonexist", ".out");
+
+ assertTrue("Unable to delete file for our test", nonexist.delete());
+
+ assertFalse("Unable to test as our test file exists.", nonexist.exists());
+
+ try
+ {
+ LogMonitor montior = new LogMonitor(nonexist);
+ //Validte that the monitor is now running on a new file
+ assertTrue("New file does not have correct name:" + montior.
+ getMonitoredFile().getName(),
+ montior.getMonitoredFile().getName().contains("LogMonitor"));
+ }
+ catch (IOException ioe)
+ {
+ fail("IOE thrown:" + ioe);
+ }
+ }
+
+ /**
+ * Test that Log file matches logged messages.
+ *
+ * @throws java.io.IOException if there is a problem creating LogMontior
+ */
+ public void testFindMatches_Match() throws IOException
+ {
+ LogMonitor monitor = new LogMonitor();
+
+ String message = getName() + ": Test Message";
+
+ Logger.getRootLogger().warn(message);
+
+ validateLogContainsMessage(monitor, message);
+ }
+
+ /**
+ * Test that Log file does not match a message not logged.
+ *
+ * @throws java.io.IOException if there is a problem creating LogMontior
+ */
+ public void testFindMatches_NoMatch() throws IOException
+ {
+ LogMonitor monitor = new LogMonitor();
+
+ String message = getName() + ": Test Message";
+
+ Logger.getRootLogger().warn(message);
+
+ String notLogged = "This text was not logged";
+
+ validateLogDoesNotContainsMessage(monitor, notLogged);
+ }
+
+ public void testWaitForMessage_Found() throws IOException
+ {
+ LogMonitor monitor = new LogMonitor();
+
+ String message = getName() + ": Test Message";
+
+ long TIME_OUT = 2000;
+
+ logMessageWithDelay(message, TIME_OUT / 2);
+
+ assertTrue("Message was not logged ",
+ monitor.waitForMessage(message, TIME_OUT));
+ }
+
+ public void testWaitForMessage_Timeout() throws IOException
+ {
+ LogMonitor monitor = new LogMonitor();
+
+ String message = getName() + ": Test Message";
+
+ long TIME_OUT = 2000;
+
+ logMessageWithDelay(message, TIME_OUT);
+
+ // Verify that we can time out waiting for a message
+ assertFalse("Message was logged ",
+ monitor.waitForMessage(message, TIME_OUT / 2));
+
+ // Verify that the message did eventually get logged.
+ assertTrue("Message was never logged.",
+ monitor.waitForMessage(message, TIME_OUT));
+ }
+
+ public void testReset() throws IOException
+ {
+ LogMonitor monitor = new LogMonitor();
+
+ String message = getName() + ": Test Message";
+
+ Logger.getRootLogger().warn(message);
+
+ validateLogContainsMessage(monitor, message);
+
+ String LOG_RESET_TEXT = "Log Monitor Reset";
+
+ validateLogDoesNotContainsMessage(monitor, LOG_RESET_TEXT);
+
+ monitor.reset();
+
+ validateLogContainsMessage(monitor, LOG_RESET_TEXT);
+
+ assertEquals(LOG_RESET_TEXT + "\n", monitor.readFile());
+ }
+
+ public void testRead() throws IOException
+ {
+ LogMonitor monitor = new LogMonitor();
+
+ String message = getName() + ": Test Message";
+
+ Logger.getRootLogger().warn(message);
+
+ String fileContents = monitor.readFile();
+
+ assertTrue("Logged message not found when reading file.",
+ fileContents.contains(message));
+ }
+
+ /****************** Helpers ******************/
+
+ /**
+ * Validate that the LogMonitor does not match the given string in the log
+ *
+ * @param log The LogMonitor to check
+ * @param message The message to check for
+ *
+ * @throws IOException if a problems occurs
+ */
+ protected void validateLogDoesNotContainsMessage(LogMonitor log, String message)
+ throws IOException
+ {
+ List<String> results = log.findMatches(message);
+
+ assertNotNull("Null results returned.", results);
+
+ assertEquals("Incorrect result set size", 0, results.size());
+ }
+
+ /**
+ * Validate that the LogMonitor can match the given string in the log
+ *
+ * @param log The LogMonitor to check
+ * @param message The message to check for
+ *
+ * @throws IOException if a problems occurs
+ */
+ protected void validateLogContainsMessage(LogMonitor log, String message)
+ throws IOException
+ {
+ List<String> results = log.findMatches(message);
+
+ assertNotNull("Null results returned.", results);
+
+ assertEquals("Incorrect result set size", 1, results.size());
+
+ assertTrue("Logged Message'" + message + "' not present in results:"
+ + results.get(0), results.get(0).contains(message));
+ }
+
+ /**
+ * Create a new thread to log the given message after the set delay
+ *
+ * @param message the messasge to log
+ * @param delay the delay (ms) to wait before logging
+ */
+ private void logMessageWithDelay(final String message, final long delay)
+ {
+ new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+
+ Logger.getRootLogger().warn(message);
+ }
+ }).start();
+ }
+
+}
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index 69077a97c8..643a26bb9d 100644
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -77,3 +77,5 @@ org.apache.qpid.server.store.PersistentStoreTest#*
// QPID-1225 : Temporary remove this test until the problem has been addressed
org.apache.qpid.server.security.acl.SimpleACLTest#testClientPublishInvalidQueueSuccess
+// CPP Broker does not follow the same Logging convention as the Java broker
+org.apache.qpid.server.logging.*