summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-05 10:11:48 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-05 10:11:48 +0000
commitb814e73fa07ec35dea45d58f9a2deaa44630edd9 (patch)
tree0adb70432e7617d238998034a59963795100f6c3
parent2a40f697eaef97b68619eaf7491f1df7084a8754 (diff)
downloadqpid-python-b814e73fa07ec35dea45d58f9a2deaa44630edd9.tar.gz
Merged from trunk up to r796653
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821735 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp31
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.h4
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/PipeHandle.cpp62
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/SystemInfo.cpp7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java115
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java64
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java49
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java86
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java73
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java206
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java202
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java262
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java125
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java288
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java102
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java258
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java68
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java79
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java69
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java57
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java114
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java31
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java45
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java2
-rw-r--r--qpid/java/build.deps1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java182
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java80
-rw-r--r--qpid/java/module.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
-rw-r--r--qpid/python/mllib/__init__.py30
-rw-r--r--qpid/specs/amqp.0-10-qpid-errata.xml2
71 files changed, 3757 insertions, 187 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index d77bd9b7ca..37a9e9b4af 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -228,6 +228,8 @@ void SemanticState::record(const DeliveryRecord& delivery)
unacked.push_back(delivery);
}
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
+
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
@@ -255,7 +257,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
- syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
+ syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
deliveryCount(0)
{}
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index e52274597e..6f1d42249d 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -57,6 +57,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
dtxImpl(s)
{}
+static const std::string TRUE("true");
+static const std::string FALSE("false");
void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
const string& alternateExchange,
@@ -67,8 +69,8 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" : "false") ));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" : "false")));
+ params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? TRUE : FALSE) ));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? TRUE : FALSE)));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId()));
}
@@ -325,10 +327,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" : "false") ));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" : "false")));
- params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? "true" : "false")));
- params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? "true" : "false")));
+ params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? TRUE : FALSE) ));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? TRUE : FALSE)));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? TRUE : FALSE)));
+ params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? TRUE : FALSE)));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
}
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index f5e79012d9..0878ae94b9 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -29,6 +29,11 @@
#include <unistd.h>
+// This is a macro instead of a function because we don't want to
+// evaluate the MSG argument unless there is an error.
+#define CPG_CHECK(RESULT, MSG) \
+ if ((RESULT) != CPG_OK) throw Exception(errorStr((RESULT), (MSG)))
+
namespace qpid {
namespace cluster {
@@ -36,7 +41,7 @@ using namespace std;
Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
void* cpg=0;
- check(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
+ CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
if (!cpg) throw Exception("Cannot get CPG instance.");
return reinterpret_cast<Cpg*>(cpg);
}
@@ -66,7 +71,7 @@ void Cpg::globalConfigChange(
int Cpg::getFd() {
int fd;
- check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor");
+ CPG_CHECK(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor");
return fd;
}
@@ -84,8 +89,8 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
sys::sleep(5);
err = cpg_initialize(&handle, &callbacks);
}
- check(err, "Failed to initialize CPG.");
- check(cpg_context_set(handle, this), "Cannot set CPG context");
+ CPG_CHECK(err, "Failed to initialize CPG.");
+ CPG_CHECK(cpg_context_set(handle, this), "Cannot set CPG context");
// Note: CPG is currently unix-specific. If CPG is ported to
// windows then this needs to be refactored into
// qpid::sys::<platform>
@@ -102,11 +107,11 @@ Cpg::~Cpg() {
void Cpg::join(const std::string& name) {
group = name;
- check(cpg_join(handle, &group), cantJoinMsg(group));
+ CPG_CHECK(cpg_join(handle, &group), cantJoinMsg(group));
}
void Cpg::leave() {
- check(cpg_leave(handle, &group), cantLeaveMsg(group));
+ CPG_CHECK(cpg_leave(handle, &group), cantLeaveMsg(group));
}
@@ -115,14 +120,14 @@ void Cpg::leave() {
bool Cpg::mcast(const iovec* iov, int iovLen) {
// Check for flow control
cpg_flow_control_state_t flowState;
- check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+ CPG_CHECK(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
if (flowState == CPG_FLOW_CONTROL_ENABLED)
return false;
cpg_error_t result;
do {
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
- if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
+ if (result != CPG_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group));
} while(result == CPG_ERR_TRY_AGAIN);
return true;
}
@@ -131,20 +136,20 @@ void Cpg::shutdown() {
if (!isShutdown) {
QPID_LOG(debug,"Shutting down CPG");
isShutdown=true;
- check(cpg_finalize(handle), "Error in shutdown of CPG");
+ CPG_CHECK(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
void Cpg::dispatchOne() {
- check(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch");
+ CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch");
}
void Cpg::dispatchAll() {
- check(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch");
+ CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch");
}
void Cpg::dispatchBlocking() {
- check(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch");
+ CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch");
}
string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
@@ -184,7 +189,7 @@ std::string Cpg::cantMcastMsg(const Name& group) {
MemberId Cpg::self() const {
unsigned int nodeid;
- check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
+ CPG_CHECK(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
return MemberId(nodeid, getpid());
}
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h
index ac27a09ae6..624721b560 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.h
+++ b/qpid/cpp/src/qpid/cluster/Cpg.h
@@ -120,10 +120,6 @@ class Cpg : public sys::IOHandle {
static std::string cantLeaveMsg(const Name&);
static std::string cantMcastMsg(const Name&);
- static void check(cpg_error_t result, const std::string& msg) {
- if (result != CPG_OK) throw Exception(errorStr(result, msg));
- }
-
static Cpg* cpgFromHandle(cpg_handle_t);
static void globalDeliver(
diff --git a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
index e2cbff3908..062458ae5f 100755
--- a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
@@ -19,9 +19,6 @@
#include "qpid/sys/PipeHandle.h"
#include "qpid/sys/windows/check.h"
-#include <io.h>
-#include <fcntl.h>
-#include <errno.h>
#include <winsock2.h>
namespace qpid {
@@ -29,14 +26,53 @@ namespace sys {
PipeHandle::PipeHandle(bool nonBlocking) {
- int pair[2];
- pair[0] = pair[1] = -1;
+ SOCKET listener, pair[2];
+ struct sockaddr_in addr;
+ int err;
+ int addrlen = sizeof(addr);
+ pair[0] = pair[1] = INVALID_SOCKET;
+ if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
- if (_pipe(pair, 128, O_BINARY) == -1)
- throw qpid::Exception(QPID_MSG("Creation of pipe failed"));
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.sin_port = 0;
- writeFd = pair[0];
- readFd = pair[1];
+ err = bind(listener, (const struct sockaddr*) &addr, sizeof(addr));
+ if (err == SOCKET_ERROR) {
+ err = WSAGetLastError();
+ closesocket(listener);
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ err = getsockname(listener, (struct sockaddr*) &addr, &addrlen);
+ if (err == SOCKET_ERROR) {
+ err = WSAGetLastError();
+ closesocket(listener);
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ try {
+ if (listen(listener, 1) == SOCKET_ERROR)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if ((pair[0] = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if (connect(pair[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if ((pair[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+
+ closesocket(listener);
+ writeFd = pair[0];
+ readFd = pair[1];
+ }
+ catch (...) {
+ closesocket(listener);
+ if (pair[0] != INVALID_SOCKET)
+ closesocket(pair[0]);
+ throw;
+ }
// Set the socket to non-blocking
if (nonBlocking) {
@@ -46,16 +82,16 @@ PipeHandle::PipeHandle(bool nonBlocking) {
}
PipeHandle::~PipeHandle() {
- close(readFd);
- close(writeFd);
+ closesocket(readFd);
+ closesocket(writeFd);
}
int PipeHandle::read(void* buf, size_t bufSize) {
- return ::read(readFd, buf, bufSize);
+ return ::recv(readFd, (char *)buf, bufSize, 0);
}
int PipeHandle::write(const void* buf, size_t bufSize) {
- return ::write(writeFd, buf, bufSize);
+ return ::send(writeFd, (const char *)buf, bufSize, 0);
}
int PipeHandle::getReadHandle() {
diff --git a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
index 3e2fcb1517..ea53fc199c 100755
--- a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
@@ -181,18 +181,21 @@ uint32_t SystemInfo::getParentProcessId()
std::string SystemInfo::getProcessName()
{
+ std::string name;
+
// Only want info for the current process, so ask for something specific.
// The module info won't be used here but it keeps the snapshot limited to
// the current process so a search through all processes is not needed.
HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0);
if (snap == INVALID_HANDLE_VALUE)
- return 0;
+ return name;
PROCESSENTRY32 entry;
entry.dwSize = sizeof(entry);
if (!Process32First(snap, &entry))
entry.szExeFile[0] = '\0';
CloseHandle(snap);
- return std::string(entry.szExeFile);
+ name = entry.szExeFile;
+ return name;
}
}} // namespace qpid::sys
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e0d325a5b0..fc16b75e1a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator");
+ envVarMap.put("QPID_STATUS-UPDATES", "status-updates");
}
public ServerConfiguration(File configurationURL) throws ConfigurationException
@@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler
}
return conf;
}
-
+
+ public boolean getStatusEnabled()
+ {
+ return getConfig().getBoolean("status-updates", true);
+ }
+
// Our configuration class needs to make the interpolate method
// public so it can be called below from the config method.
private static class MyConfiguration extends CompositeConfiguration
@@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public boolean getStatusUpdates()
+ {
+ // Retrieve the setting from configuration but default to on.
+ String value = getConfig().getString("status-updates", "on");
+
+ return value.equalsIgnoreCase("on");
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 054674aed4..5d7adc6371 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
- );
+ final AMQChannel channel = new AMQChannel(session,channelId,
+ virtualHost.getMessageStore());
+
+
+
session.addChannel(channel);
ChannelOpenOkBody response;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
new file mode 100644
index 0000000000..e9cc7449cd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public class BrokerMessages
+{
+
+ public static LogMessage BRK_1001(String version, String build)
+ {
+ return new LogMessage()
+ {
+
+ };
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
new file mode 100644
index 0000000000..203a5d160d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * LogActor the entity that is stored as in a ThreadLocal and used to perform logging.
+ *
+ * The actor is responsible for formatting its display name for the log entry.
+ *
+ * The actor performs the requested logging.
+ */
+public interface LogActor
+{
+ /**
+ * Logs the specified LogMessage about the LogSubject
+ *
+ * Currently logging has a global setting however this will later be revised and
+ * as such the LogActor will need to take into consideration any new configuration
+ * as a means of enabling the logging of LogActors and LogSubjects.
+ *
+ * @param subject The subject that is being logged
+ * @param message The message to log
+ */
+ public void message(LogSubject subject, LogMessage message);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
new file mode 100644
index 0000000000..5c112ff100
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public interface LogMessage
+{
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
new file mode 100644
index 0000000000..e53ef364bf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * Each LogSubject that wishes to be logged will implement this to provide their
+ * own display representation.
+ *
+ * The display representation is retrieved through the toString() method.
+ */
+public interface LogSubject
+{
+ /**
+ * Logs the message as provided by String.valueOf(message).
+ *
+ * @returns String the display representation of this LogSubject
+ */
+ public String toString();
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
new file mode 100644
index 0000000000..7d515f3263
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * A RawMessage Logger takes the given String and any Throwable and writes the
+ * data to its resource.
+ */
+public interface RawMessageLogger
+{
+
+ /**
+ * Log the given message.
+ *
+ * @param message String to log.
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the message and formatted stack trace for any Throwable.
+ *
+ * @param message String to log.
+ * @param throwable Throwable for which to provide stack trace.
+ */
+ public void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
new file mode 100644
index 0000000000..cd7992faa7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * The RootMessageLogger is used by the LogActors to query if
+ * logging is enabled for the requested message and to provide the actual
+ * message that should be logged.
+ */
+public interface RootMessageLogger
+{
+ /**
+ * Determine if the LogSubject and the LogActor should be
+ * generating log messages.
+ *
+ * @param subject The subject of this log request
+ * @param actor The actor requesting the logging
+ * @return boolean true if the message should be logged.
+ */
+ boolean isMessageEnabled(LogActor actor, LogSubject subject);
+
+
+ /**
+ * Log the raw message to the configured logger.
+ *
+ * @param message The message to log
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the raw message to the configured logger.
+ * Along with a formated stack trace from the Throwable.
+ *
+ * @param message The message to log
+ * @param throwable Optional Throwable that should provide stact trace
+ */
+ void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
new file mode 100644
index 0000000000..9270c316b6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.server.configuration.ServerConfiguration;
+
+public class RootMessageLoggerImpl implements RootMessageLogger
+{
+ private boolean _enabled;
+
+ RawMessageLogger _rawLogger;
+ private static final String MESSAGE = "MESSAGE ";
+
+ public RootMessageLoggerImpl(ServerConfiguration configuration, RawMessageLogger rawLogger)
+ {
+ _enabled = configuration.getStatusUpdates();
+ _rawLogger = rawLogger;
+ }
+
+ public boolean isMessageEnabled(LogActor actor, LogSubject subject)
+ {
+ return _enabled;
+ }
+
+ public void rawMessage(String message)
+ {
+ _rawLogger.rawMessage(MESSAGE + message);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawLogger.rawMessage(MESSAGE + message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
new file mode 100644
index 0000000000..f4e2793e8b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPChannelActor represtents a connection through the AMQP port with an
+ * associated Channel.
+ *
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [con:1(user@127.0.0.1/)/ch:1]
+ * <p/>
+ * To do this it requires access to the IO Layers as well as a Channel
+ */
+public class AMQPChannelActor extends AbstractActor
+{
+
+ /**
+ * Create a new ChannelActor
+ *
+ * @param channel The Channel for this LogActor
+ * @param rootLogger The root Logger that this LogActor should use
+ */
+ public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getPrincipal().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId())
+ + "] ";
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
new file mode 100644
index 0000000000..d459fc0f06
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPConnectionActor represtents a connectionthrough the AMQP port.
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [ con:1(user@127.0.0.1/) ]
+ * <p/>
+ * To do this it requires access to the IO Layers.
+ */
+public class AMQPConnectionActor extends AbstractActor
+{
+ /**
+ * 0 - Connection ID
+ * 1 - Remote Address
+ */
+ public static String SOCKET_FORMAT = "con:{0}({1})";
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String USER_FORMAT = "con:{0}({1}@{2})";
+
+ public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "[" + MessageFormat.format(SOCKET_FORMAT,
+ session.getSessionID(),
+ session.getRemoteAddress())
+
+ + "] ";
+ }
+
+ /**
+ * Call when the connection has been authorized so that the logString
+ * can be updated with the new user identity.
+ *
+ * @param session the authorized session
+ */
+ public void connectionAuthorized(AMQProtocolSession session)
+ {
+ _logString = "[" + MessageFormat.format(USER_FORMAT,
+ session.getSessionID(),
+ session.getPrincipal().getName(),
+ session.getRemoteAddress())
+ + "] ";
+
+ }
+
+ /**
+ * Called once the user has been authenticated and they are now selecting
+ * the virtual host they wish to use.
+ *
+ * @param session the session that now has a virtualhost associated with it.
+ */
+ public void virtualHostSelected(AMQProtocolSession session)
+ {
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ConnectionLogSubject.CONNECTION_FORMAT :
+ * con:{0}({1}@{2}/{3})
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT,
+ session.getSessionID(),
+ session.getPrincipal().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName())
+ + "] ";
+
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
new file mode 100644
index 0000000000..95f2dc9ff6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+public abstract class AbstractActor implements LogActor
+{
+ protected String _logString;
+ protected RootMessageLogger _rootLogger;
+
+ public AbstractActor(RootMessageLogger rootLogger)
+ {
+ _rootLogger = rootLogger;
+ }
+
+ public void message(LogSubject subject, LogMessage message)
+ {
+ if (_rootLogger.isMessageEnabled(this, subject))
+ {
+ _rootLogger.rawMessage(_logString + String.valueOf(subject) + message);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
new file mode 100644
index 0000000000..221e57eebb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+
+import java.util.LinkedList;
+import java.util.Deque;
+
+public class CurrentActor
+{
+ private static final ThreadLocal<Deque<LogActor>> _currentActor = new ThreadLocal<Deque<LogActor>>()
+ {
+ protected Deque<LogActor> initialValue()
+ {
+ return new LinkedList<LogActor>();
+ }
+ };
+
+ public static void set(LogActor actor)
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.addFirst(actor);
+ }
+
+ public static void remove()
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.remove();
+ }
+
+ public static LogActor get()
+ {
+ return _currentActor.get().peek();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
new file mode 100644
index 0000000000..58d55a13bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+import java.text.MessageFormat;
+import java.security.Principal;
+
+public class ManagementActor extends AbstractActor
+{
+
+ /**
+ * LOG FORMAT for the ManagementActor,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})";
+
+ /**
+ * //todo Correct interface to provide connection details
+ * @param user
+ * @param rootLogger The RootLogger to use for this Actor
+ */
+ public ManagementActor(Principal user, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT,
+ "<MNG:ConnectionID>",
+ user.getName(),
+ "<MNG:RemoteAddress>")
+ + "] ";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
new file mode 100644
index 0000000000..3774155626
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.RawMessageLogger;
+
+public class Log4jMessageLogger implements RawMessageLogger
+{
+ public static final String DEFAULT_LEVEL = "INFO";
+ public static final String DEFAULT_LOGGER = "qpid.message";
+ private Level _level;
+ private Logger _rawMessageLogger;
+
+ public Log4jMessageLogger()
+ {
+ this(DEFAULT_LEVEL, DEFAULT_LOGGER);
+ }
+
+ public Log4jMessageLogger(String level, String logger)
+ {
+ _level = Level.toLevel(level);
+
+ _rawMessageLogger = Logger.getLogger(logger);
+ }
+
+ public void rawMessage(String message)
+ {
+ rawMessage(message, null);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawMessageLogger.log(_level, message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
new file mode 100644
index 0000000000..4fb5bdcc93
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.logging.LogSubject;
+
+import java.text.MessageFormat;
+
+/**
+ * The LogSubjects all have a similar requriement to format their output and
+ * provide the String value.
+ *
+ * This Abstract LogSubject provides this basic functionality, allowing the
+ * actual LogSubjects to provide their formating and data.
+ */
+public abstract class AbstractLogSubject implements LogSubject
+{
+ /**
+ * The logString that will be returned via toString
+ */
+ protected String logString;
+
+ /**
+ * Set the toString logging of this LogSubject. Based on a format provided
+ * by format and the var args.
+ * @param format The Message to format
+ * @param args The values to put in to the message.
+ */
+ protected void setLogStringWithFormat(String format, Object... args)
+ {
+ logString = "[" + MessageFormat.format(format, args) + "] ";
+ }
+
+ /**
+ * ToString is how the Logging infrastructure will get the text for this
+ * LogSubject
+ *
+ * @return String representing this LogSubject
+ */
+ @Override
+ public String toString()
+ {
+ return logString;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
new file mode 100644
index 0000000000..fd171fea5a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class BindingLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ * 3 - Queue Name
+ * 4 - Binding RoutingKey
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})";
+
+ /**
+ * Create a BindingLogSubject that Logs in the following format.
+ *
+ * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ]
+ *
+ * @param routingKey
+ * @param exchange
+ * @param queue
+ */
+ public BindingLogSubject(AMQShortString routingKey, Exchange exchange,
+ AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+ exchange.getType(),
+ exchange.getName(),
+ queue.getName(),
+ routingKey);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
new file mode 100644
index 0000000000..03afd0b772
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+public class ChannelLogSubject extends AbstractLogSubject
+{
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT
+ + "/ch:{4}";
+
+ public ChannelLogSubject(AMQChannel channel)
+ {
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ // Provide the value for the 4th replacement.
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getPrincipal().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
new file mode 100644
index 0000000000..65d65a24d2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/** The Connection LogSubject */
+public class ConnectionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})";
+
+ public ConnectionLogSubject(AMQProtocolSession session)
+ {
+ setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(),
+ session.getPrincipal().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
new file mode 100644
index 0000000000..21e5f5e4ce
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ExchangeLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})";
+
+ /** Create an ExchangeLogSubject that Logs in the following format. */
+ public ExchangeLogSubject(Exchange exchange, VirtualHost vhost)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, vhost.getName(),
+ exchange.getType(), exchange.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
new file mode 100644
index 0000000000..89f31ef477
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class QueueLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost name
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
+
+ /** Create an QueueLogSubject that Logs in the following format. */
+ public QueueLogSubject(AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT,
+ queue.getVirtualHost().getName(),
+ queue.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
new file mode 100644
index 0000000000..b68ef2e9a9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.subscription.Subscription;
+
+public class SubscriptionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the SubscriptionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Subscription ID
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "sub:{0}(qu({1}))";
+
+ /**
+ * Create an QueueLogSubject that Logs in the following format.
+ *
+ * @param subscription
+ */
+ public SubscriptionLogSubject(Subscription subscription)
+ {
+
+ setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(),
+ subscription.getQueue().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 2db16ef751..5f1615351b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -21,26 +21,39 @@
package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-
+import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.CloseFuture;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
-
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Principal;
@@ -64,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
@@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
private org.apache.mina.common.WriteFuture _lastWriteFuture;
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
+
+ private AMQPConnectionActor _actor;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
+ CurrentActor.set(_actor);
try
{
body.handle(channelId, this);
@@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
closeChannel(channelId);
throw e;
}
-
+ finally
+ {
+ CurrentActor.remove();
+ }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_virtualHost = virtualHost;
+ _actor.virtualHostSelected(this);
+
_virtualHost.getConnectionRegistry().registerConnection(this);
_managedObject = createMBean();
@@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
+
+ // Let the actor know that this connection is now Authorized
+ _actor.connectionAuthorized(this);
}
public Principal getPrincipal()
@@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _authorizedID;
}
+ public SocketAddress getRemoteAddress()
+ {
+ return _minaProtocolSession.getRemoteAddress();
+ }
+
public MethodRegistry getMethodRegistry()
{
return MethodRegistry.getMethodRegistry(getProtocolVersion());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1e8dd9f77a..b15e6a4219 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -37,6 +37,7 @@ import java.security.Principal;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
{
+ long getSessionID();
public static final class ProtocolSessionIdentifier
{
@@ -196,6 +197,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
void setAuthorizedID(Principal authorizedID);
+ public java.net.SocketAddress getRemoteAddress();
+
public MethodRegistry getMethodRegistry();
public MethodDispatcher getMethodDispatcher();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 1fce03b1c3..8c24acccbf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -274,6 +274,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
{
+
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
@@ -281,6 +282,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_bindings.addBinding(routingKey, arguments, exchange);
+// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments);
+
+ //fixme MR logging in progress
+// _bindings.addBinding(binding);
+//
+// if (_logger.isMessageEnabled(binding))
+// {
+// _logger.message(binding, "QM-1001 : Created Binding");
+// }
}
public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 22b4623ae1..b58b849133 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected PluginManager _pluginManager;
+ protected RootMessageLogger _rootMessageLogger;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _pluginManager;
}
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootMessageLogger;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 39164883f9..31a85b878a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
@@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
+ new Log4jMessageLogger());
+
initialiseManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
_pluginManager = new PluginManager(_configuration.getPluginDirectory());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index bbfda3addc..7d17639f22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.mina.common.IoAcceptor;
public interface IApplicationRegistry
@@ -69,6 +70,8 @@ public interface IApplicationRegistry
PluginManager getPluginManager();
+ RootMessageLogger getRootMessageLogger();
+
/**
* Register any acceptors for this registry
* @param bindAddress The address that the acceptor has been bound with
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 87f4fd5c7c..2053082fb2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -50,6 +50,8 @@ public interface Subscription
AMQShortString getConsumerTag();
+ long getSubscriptionID();
+
boolean isSuspended();
boolean hasInterest(QueueEntry msg);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 68581acc14..26a3e9bebc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -69,6 +70,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
+
static final class BrowserSubscription extends SubscriptionImpl
{
public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
@@ -535,6 +541,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _consumerTag;
}
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
public AMQProtocolSession getProtocolSession()
{
return _channel.getProtocolSession();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index eda2d3a94e..9ef1e029d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.util;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
public class NullApplicationRegistry extends ApplicationRegistry
{
public NullApplicationRegistry() throws ConfigurationException
@@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
_logger.info("Initialising NullApplicationRegistry");
-
+
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger());
+
_configuration.setHousekeepingExpiredMessageCheckPeriod(200);
-
+
Properties users = new Properties();
users.put("guest", "guest");
@@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
VirtualHost dummyHost = new VirtualHost(hostConfig);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
index 27917fac8a..5543adbeb5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -32,6 +35,12 @@ public class VirtualHostRegistry
private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
public synchronized void registerVirtualHost(VirtualHost host) throws Exception
{
@@ -67,4 +76,9 @@ public class VirtualHostRegistry
{
return new ArrayList<VirtualHost>(_registry.values());
}
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java
new file mode 100644
index 0000000000..012a590687
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+
+import java.util.List;
+
+public class RootMessageLoggerImplTest extends TestCase
+{
+
+ RootMessageLogger _rootLogger;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ _rootLogger = new RootMessageLoggerImpl(serverConfig, _rawLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ public void testLog()
+ {
+ String message = "test logging";
+
+ _rootLogger.rawMessage(message);
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ assertTrue(logs.get(0).toString().contains(message));
+ }
+
+ public void testLogWithThrowable()
+ {
+ String message = "test logging";
+ Exception exception = new Exception("Test");
+
+ _rootLogger.rawMessage(message, exception);
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 2, logs.size());
+
+ String loggedMessage = (String) logs.get(0);
+ assertTrue("Message not found in log:" + loggedMessage,
+ loggedMessage.contains(message));
+
+ Exception fromLog = (Exception) logs.get(1);
+ assertEquals(exception.getMessage(), fromLog.getMessage());
+ }
+
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java
new file mode 100644
index 0000000000..9a3c18bf99
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+/**
+ * Set of test to validate the effects of the changes made to the
+ * ServerConfiguration to enable the enabling/disabling of status update
+ * messages.
+ *
+ * The default is to on.
+ */
+public class StatusUpdateConfigurationTest extends TestCase
+{
+
+ /**
+ * Validate that with no configuration the status updates will default to
+ * enabled.
+ * @throws org.apache.commons.configuration.ConfigurationException
+ * - if there was a problem in creating the configuratino
+ */
+ public void testEnabled() throws ConfigurationException
+ {
+
+ ServerConfiguration serverConfig = new ServerConfiguration(
+ new PropertiesConfiguration());
+
+ assertTrue("Status Updates not enabled as expected.",
+ serverConfig.getStatusUpdates());
+ }
+
+
+ /**
+ * Validate that through the config it is possible to disable status updates
+ * @throws org.apache.commons.configuration.ConfigurationException
+ * - if there was a problem in creating the configuratino
+ */
+ public void testUpdateControls() throws ConfigurationException
+ {
+
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ config.setProperty("status-updates", "off");
+
+
+ assertFalse("Status Updates should not be enabled.",
+ serverConfig.getStatusUpdates());
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
new file mode 100644
index 0000000000..298e3bc22c
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class AMQPChannelActorTest extends TestCase
+{
+
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+
+ AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+
+ _amqpActor = new AMQPChannelActor(channel, rootLogger);
+
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ /**
+ * Test that when logging on behalf of the channel
+ * The test sends a message then verifies that it entered the logs.
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * contain the channel id ('/ch:1') identification.
+ */
+ public void testChannel()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message:" + logs.get(0),
+ logs.get(0).toString().contains(message));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [con: prefix",
+ logs.get(0).toString().contains("[con:"));
+
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{'." + logs.get(0),
+ logs.get(0).toString().contains("{"));
+
+ // Verify that the logged message contains the 'ch:1' marker
+ assertTrue("Message was not logged as part of channel 1" + logs.get(0),
+ logs.get(0).toString().contains("/ch:1"));
+
+ }
+
+ public void testChannelLoggingOff() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "OFF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+
+ AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+
+ _amqpActor = new AMQPChannelActor(channel, rootLogger);
+
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
new file mode 100644
index 0000000000..c220865864
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogActor;
+
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class AMQPConnectionActorTest extends TestCase
+{
+
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ _amqpActor = new AMQPConnectionActor(session, rootLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ /**
+ * Test the AMQPActor logging as a Connection level.
+ *
+ * The test sends a message then verifies that it entered the logs.
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * not contain any channel identification.
+ *
+ */
+ public void testConnection()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message",
+ logs.get(0).toString().contains(message));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [con: prefix",
+ logs.get(0).toString().contains("[con:"));
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{'.",
+ logs.get(0).toString().contains("{"));
+
+ // Verify that the logged message does not contains the 'ch:' marker
+ assertFalse("Message was logged with a channel identifier." + logs.get(0),
+ logs.get(0).toString().contains("/ch:"));
+ }
+
+
+
+ public void testConnectionLoggingOff() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "OFF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+
+ _amqpActor = new AMQPConnectionActor(session, rootLogger);
+
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
new file mode 100644
index 0000000000..c1cc3253a8
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+/**
+ * Test : CurrentActorTest
+ * Summary:
+ * Validate ThreadLocal operation.
+ *
+ * Test creates THREADS number of threads which all then execute the same test
+ * together ( as close as looping Thread.start() will allow).
+ *
+ * Test:
+ * Test sets the CurrentActor then proceeds to retrieve the value and use it.
+ *
+ * The test also validates that it is the same LogActor that this thread set.
+ *
+ * Finally the LogActor is removed and tested to make sure that it was
+ * successfully removed.
+ *
+ * By having a higher number of threads than would normally be used in the
+ * Poolling filter we aim to catch the race condition where a ThreadLocal remove
+ * is called before one or more threads call get(). This way we can ensure that
+ * the remove does not affect more than the Thread it was called in.
+ */
+public class CurrentActorTest extends TestCase
+{
+ //Set this to be a reasonably large number
+ int THREADS = 10;
+
+ // Record any exceptions that are thrown by the threads
+ final Exception[] _errors = new Exception[THREADS];
+
+ // Create a single session for this test.
+ AMQProtocolSession session;
+
+ public void setUp()
+ {
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+ }
+
+ public void testFIFO() throws AMQException
+ {
+ // Create a new actor using retrieving the rootMessageLogger from
+ // the default ApplicationRegistry.
+ //fixme reminder that we need a better approach for broker testing.
+ AMQPConnectionActor connectionActor = new AMQPConnectionActor(session,
+ ApplicationRegistry.getInstance().
+ getRootMessageLogger());
+
+ CurrentActor.set(connectionActor);
+
+ //Use the Actor to send a simple message
+ CurrentActor.get().message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[CurrentActorTest] ";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return "Connection Log Msg";
+ }
+ });
+
+ // Verify it was the same actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ connectionActor, CurrentActor.get());
+
+ /**
+ * Set the actor to nwo be the Channel actor so testing the ability
+ * to push the actor on to the stack
+ */
+
+ AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+
+ AMQPChannelActor channelActor = new AMQPChannelActor(channel,
+ ApplicationRegistry.getInstance().
+ getRootMessageLogger());
+
+ CurrentActor.set(channelActor);
+
+ //Use the Actor to send a simple message
+ CurrentActor.get().message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[CurrentActorTest] ";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return "Channel Log Msg";
+ }
+ });
+
+ // Verify it was the same actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ channelActor, CurrentActor.get());
+
+ // Remove the ChannelActor from the stack
+ CurrentActor.remove();
+
+ // Verify we now have the same connection actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ connectionActor, CurrentActor.get());
+
+ // Verify that removing the last actor returns us to a null value.
+ CurrentActor.remove();
+
+ assertNull("CurrentActor should be null", CurrentActor.get());
+
+ }
+
+ public void testThreadLocal()
+ {
+
+ // Setup the threads
+ Thread[] threads = new Thread[THREADS];
+ for (int count = 0; count < THREADS; count++)
+ {
+ Runnable test = new Test(count);
+ threads[count] = new Thread(test);
+ }
+
+ //Run the threads
+ for (int count = 0; count < THREADS; count++)
+ {
+ threads[count].start();
+ }
+
+ // Wait for them to finish
+ for (int count = 0; count < THREADS; count++)
+ {
+ try
+ {
+ threads[count].join();
+ }
+ catch (InterruptedException e)
+ {
+ //if we are interrupted then we will exit shortly.
+ }
+ }
+
+ // Verify that none of the tests threw an exception
+ for (int count = 0; count < THREADS; count++)
+ {
+ if (_errors[count] != null)
+ {
+ _errors[count].printStackTrace();
+ fail("Error occured in thread:" + count);
+ }
+ }
+ }
+
+ public class Test implements Runnable
+ {
+ int count;
+
+ Test(int count)
+ {
+ this.count = count;
+ }
+
+ public void run()
+ {
+
+ // Create a new actor using retrieving the rootMessageLogger from
+ // the default ApplicationRegistry.
+ //fixme reminder that we need a better approach for broker testing.
+ AMQPConnectionActor actor = new AMQPConnectionActor(session,
+ ApplicationRegistry.getInstance().
+ getRootMessageLogger());
+
+ CurrentActor.set(actor);
+
+ try
+ {
+ //Use the Actor to send a simple message
+ CurrentActor.get().message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[CurrentActorTest] ";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return "Running Thread:" + count;
+ }
+ });
+
+ // Verify it was the same actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ actor, CurrentActor.get());
+
+ // Verify that removing the actor works for this thread
+ CurrentActor.remove();
+
+ assertNull("CurrentActor should be null", CurrentActor.get());
+ }
+ catch (Exception e)
+ {
+ _errors[count] = e;
+ }
+
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
new file mode 100644
index 0000000000..fa0bb6529e
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+
+import java.security.Principal;
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class ManagementActorTest extends TestCase
+{
+
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ _amqpActor = new ManagementActor(new Principal()
+ {
+ public String getName()
+ {
+ return "ManagementActorTest";
+ }
+ }, rootLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ /**
+ * Test the AMQPActor logging as a Connection level.
+ *
+ * The test sends a message then verifies that it entered the logs.
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * not contain any channel identification.
+ */
+ public void testConnection()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message",
+ logs.get(0).toString().contains(message));
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{'.",
+ logs.get(0).toString().contains("{"));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [mng: prefix",
+ logs.get(0).toString().contains("[mng:"));
+
+ // Verify that the logged message does not contains the 'ch:' marker
+ assertFalse("Message was logged with a channel identifier." + logs.get(0),
+ logs.get(0).toString().contains("/ch:"));
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java
new file mode 100644
index 0000000000..ec84d8bc9b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+public class TestBlankActor extends AbstractActor
+{
+ public TestBlankActor(RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+ _logString = "[Blank]";
+ }
+}
+ \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
new file mode 100644
index 0000000000..d7a5aa667b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import junit.framework.TestCase;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/** Test that the Log4jMessageLogger defaults behave as expected */
+public class Log4jMessageLoggerTest extends TestCase
+{
+ private File _lodgfile;
+
+ Level _rootLevel;
+ Log4jTestAppender _appender;
+
+ @Override
+ public void setUp() throws IOException
+ {
+ // Setup a file for logging
+ _appender = new Log4jTestAppender();
+
+ Logger root = Logger.getRootLogger();
+ root.addAppender(_appender);
+
+ _rootLevel = Logger.getRootLogger().getLevel();
+ if (_rootLevel != Level.INFO)
+ {
+ root.setLevel(Level.INFO);
+ root.warn("Root Logger set to:" + _rootLevel + " Resetting to INFO for test.");
+ }
+ root.warn("Adding Test Appender:" + _appender);
+ }
+
+ @Override
+ public void tearDown()
+ {
+ Logger root = Logger.getRootLogger();
+ root.warn("Removing Test Appender:" + _appender);
+ root.warn("Resetting Root Level to : " + _rootLevel);
+
+ Logger.getRootLogger().setLevel(_rootLevel);
+
+ Logger.getRootLogger().removeAppender(_appender);
+
+ //Call close on our appender. This will clear the log messages
+ // from Memory
+ _appender.close();
+ }
+
+ /**
+ * Verify that the default configuraion of Log4jMessageLogger will
+ * log a message.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultLogsMessage() throws IOException, InterruptedException
+ {
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger();
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyLogPresent(message);
+ }
+
+ /**
+ * This test checks that if the Root Logger level is set such that the INFO
+ * messages would not be logged then the Log4jMessageLogger default of INFO
+ * will result in logging not being presented.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultsLogsAtInfo() throws IOException, InterruptedException
+ {
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger();
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ //Set default logger level to off
+ Logger.getRootLogger().setLevel(Level.WARN);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyNoLog(message);
+ }
+
+ /**
+ * Test that changing the logger works.
+ * <p/>
+ * Test this by setting the default logger level to off which has been
+ * verified to work by test 'testDefaultsLevelObeyed'
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultLoggerAdjustment() throws IOException, InterruptedException
+ {
+ String loggerName = "TestLogger";
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger(Log4jMessageLogger.DEFAULT_LEVEL, loggerName);
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ //Disable the default Log4jMessageLogger logger
+ Level originalLevel = Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).getLevel();
+ Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(Level.OFF);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyLogPresent(message);
+
+ // Restore the logging level
+ Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(originalLevel);
+ }
+
+ /**
+ * Test that changing the log level has an effect.
+ * Set the level to be debug
+ * but only set the logger to log at INFO
+ * there should be no data printed
+ * subsequently changing the root logger to allow DEBUG should
+ * show the message
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultsLevelObeyed() throws IOException, InterruptedException
+ {
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger("DEBUG", Log4jMessageLogger.DEFAULT_LOGGER);
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ //Set root logger to INFO only
+ Logger.getRootLogger().setLevel(Level.INFO);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyNoLog(message);
+
+ //Set root logger to INFO only
+ Logger.getRootLogger().setLevel(Level.DEBUG);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyLogPresent(message);
+ }
+
+ /**
+ * Check that the Log Message reached log4j
+ * @param message the message to search for
+ */
+ private void verifyLogPresent(String message)
+ {
+ List<String> results = findMessageInLog(message);
+
+ //Validate we only got one message
+ assertEquals("The result set was not as expected.", 1, results.size());
+
+ // Validate message
+ String line = results.get(0);
+
+ assertNotNull("No Message retrieved from log file", line);
+ assertTrue("Message not contained in log.:" + line,
+ line.contains(message));
+ }
+
+ /**
+ * Check that the given Message is not present in the log4j records.
+ * @param message the message to search for
+ */
+ private void verifyNoLog(String message)
+ {
+ List<String> results = findMessageInLog(message);
+
+ //Validate we only got one message
+ if (results.size() > 0)
+ {
+ System.err.println("Unexpected Log messages");
+
+ for (String msg : results)
+ {
+ System.err.println(msg);
+ }
+ }
+
+ assertEquals("No messages expected.", 0, results.size());
+ }
+
+ /**
+ * Get the appenders list of events and return a list of all the messages
+ * that contain the given message
+ *
+ * @param message the search string
+ * @return The list of all logged messages that contain the search string.
+ */
+ private List<String> findMessageInLog(String message)
+ {
+ List<LoggingEvent> log = _appender.getLog();
+
+ // Search Results for requested message
+ List<String> result = new LinkedList<String>();
+
+ for (LoggingEvent event : log)
+ {
+ if (String.valueOf(event.getMessage()).contains(message))
+ {
+ result.add(String.valueOf(event.getMessage()));
+ }
+ }
+
+ return result;
+ }
+
+
+ /**
+ * Log4j Appender that simply records all the Logging Events so we can
+ * verify that the above logging will make it to log4j in a unit test.
+ */
+ private class Log4jTestAppender extends AppenderSkeleton
+ {
+ List<LoggingEvent> _log = new LinkedList<LoggingEvent>();
+
+ protected void append(LoggingEvent loggingEvent)
+ {
+ _log.add(loggingEvent);
+ }
+
+ public void close()
+ {
+ _log.clear();
+ }
+
+ /**
+ * @return the list of LoggingEvents that have occured in this Appender
+ */
+ public List<LoggingEvent> getLog()
+ {
+ return _log;
+ }
+
+ public boolean requiresLayout()
+ {
+ return false;
+ }
+ }
+}
+
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java
new file mode 100644
index 0000000000..df50cfb57a
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import org.apache.qpid.server.logging.RawMessageLogger;
+
+import java.util.List;
+import java.util.LinkedList;
+
+public class UnitTestMessageLogger implements RawMessageLogger
+{
+ List<Object> _log;
+
+ public UnitTestMessageLogger()
+ {
+ _log = new LinkedList<Object>();
+ }
+
+
+ public void rawMessage(String message)
+ {
+ _log.add(message);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _log.add(message);
+ _log.add(throwable);
+ }
+
+
+ public List<Object> getLogMessages()
+ {
+ return _log;
+ }
+
+ public void clearLogMessages()
+ {
+ _log.clear();
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java
new file mode 100644
index 0000000000..e10de48432
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import junit.framework.TestCase;
+
+import java.util.List;
+
+/**
+ * Test: UnitTestMessageLoggerTest
+ *
+ * This test verifies that UnitTestMessageLogger adhears to its interface.
+ *
+ * Messages are logged, and Throwables recorded in an array that can be
+ * retreived and cleared.
+ *
+ */
+public class UnitTestMessageLoggerTest extends TestCase
+{
+ private static final String TEST_MESSAGE = "Test";
+ private static final String TEST_THROWABLE = "Test Throwable";
+
+ public void testRawMessage()
+ {
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+
+ assertEquals("Messages logged before test start", 0,
+ logger.getLogMessages().size());
+
+ // Log a message
+ logger.rawMessage(TEST_MESSAGE);
+
+ List<Object> messages = logger.getLogMessages();
+
+ assertEquals("Expected to have 1 messages logged", 1, messages.size());
+
+ assertEquals("First message not what was logged",
+ TEST_MESSAGE, messages.get(0));
+ }
+
+ public void testRawMessageWithThrowable()
+ {
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+
+ assertEquals("Messages logged before test start", 0,
+ logger.getLogMessages().size());
+
+ // Log a message
+ Throwable throwable = new Throwable(TEST_THROWABLE);
+
+ logger.rawMessage(TEST_MESSAGE, throwable);
+
+ List<Object> messages = logger.getLogMessages();
+
+ assertEquals("Expected to have 2 entries", 2, messages.size());
+
+ assertEquals("Message text not what was logged",
+ TEST_MESSAGE, messages.get(0));
+
+ assertEquals("Message throwable not what was logged",
+ TEST_THROWABLE, ((Throwable) messages.get(1)).getMessage());
+
+ }
+
+ public void testClear()
+ {
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+
+ assertEquals("Messages logged before test start", 0,
+ logger.getLogMessages().size());
+
+ // Log a message
+ logger.rawMessage(TEST_MESSAGE);
+
+ assertEquals("Expected to have 1 messages logged",
+ 1, logger.getLogMessages().size());
+
+ logger.clearLogMessages();
+
+ assertEquals("Expected to have no messagse after a clear",
+ 0, logger.getLogMessages().size());
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
new file mode 100644
index 0000000000..04081db8e3
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.actors.TestBlankActor;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.List;
+
+public abstract class AbstractTestLogSubject extends TestCase
+{
+ protected Configuration _config = new PropertiesConfiguration();
+ protected LogSubject _subject = null;
+
+ protected List<Object> performLog() throws ConfigurationException
+ {
+ if (_subject == null)
+ {
+ throw new NullPointerException("LogSubject has not been set");
+ }
+
+ ServerConfiguration serverConfig = new ServerConfiguration(_config);
+
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, logger);
+
+ LogActor actor_actor = new TestBlankActor(rootLogger);
+
+ actor_actor.message(_subject, new LogMessage()
+ {
+ public String toString()
+ {
+ return "<Log Message>";
+ }
+ });
+
+ return logger.getLogMessages();
+ }
+
+ /**
+ * Verify that the connection section has the expected items
+ *
+ * @param connectionID - The connection id (int) to check for
+ * @param user - the Connected username
+ * @param ipString - the ipString/hostname
+ * @param vhost - the virtualhost that the user connected to.
+ * @param message - the message these values should appear in.
+ */
+ protected void verifyConnection(long connectionID, String user, String ipString, String vhost, String message)
+ {
+ // This should return us MockProtocolSessionUser@null/test
+ String connectionSlice = getSlice("con:" + connectionID, message);
+
+ assertNotNull("Unable to find connection 'con:" + connectionID + "'",
+ connectionSlice);
+
+ // Exract the userName
+ String[] userNameParts = connectionSlice.split("@");
+
+ assertEquals("Unable to split Username from rest of Connection:"
+ + connectionSlice, 2, userNameParts.length);
+
+ assertEquals("Username not as expected", userNameParts[0], user);
+
+ // Extract IP.
+ String[] ipParts = userNameParts[1].split("/");
+
+ assertEquals("Unable to split IP from rest of Connection:"
+ + userNameParts[1], 2, ipParts.length);
+
+ assertEquals("IP not as expected", ipParts[0], ipString);
+
+ //Finally check vhost
+ assertEquals("Virtualhost name not as expected.", vhost, ipParts[1]);
+ }
+
+ /**
+ * Verify that the RoutingKey is present in the provided message.
+ *
+ * @param message The message to check
+ * @param routingKey The routing key to check against
+ */
+ protected void verifyRoutingKey(String message, AMQShortString routingKey)
+ {
+ String routingKeySlice = getSlice("rk", message);
+
+ assertNotNull("Routing Key not found:" + message, routingKey);
+
+ assertEquals("Routing key not correct",
+ routingKey.toString(), routingKeySlice);
+ }
+
+ /**
+ * Verify that the given Queue's name exists in the provided message
+ *
+ * @param message The message to check
+ * @param queue The queue to check against
+ */
+ protected void verifyQueue(String message, AMQQueue queue)
+ {
+ String queueSlice = getSlice("qu", message);
+
+ assertNotNull("Queue not found:" + message, queueSlice);
+
+ assertEquals("Queue name not correct",
+ queue.getName().toString(), queueSlice);
+ }
+
+ /**
+ * Verify that the given exchange (name and type) are present in the
+ * provided message.
+ *
+ * @param message The message to check
+ * @param exchange the exchange to check against
+ */
+ protected void verifyExchange(String message, Exchange exchange)
+ {
+ String exchangeSilce = getSlice("ex", message);
+
+ assertNotNull("Exchange not found:" + message, exchangeSilce);
+
+ String[] exchangeParts = exchangeSilce.split("/");
+
+ assertEquals("Exchange should be in two parts ex(type/name)", 2,
+ exchangeParts.length);
+
+ assertEquals("Exchange type not correct",
+ exchange.getType().toString(), exchangeParts[0]);
+
+ assertEquals("Exchange name not correct",
+ exchange.getName().toString(), exchangeParts[1]);
+
+ }
+
+ /**
+ * Verify that a VirtualHost with the given name appears in the given
+ * message.
+ *
+ * @param message the message to search
+ * @param vhost the vhostName to check against
+ */
+ protected void verifyVirtualHost(String message, VirtualHost vhost)
+ {
+ String vhostSlice = getSlice("vh", message);
+
+ assertNotNull("Virtualhost not found:" + message, vhostSlice);
+
+ assertEquals("Virtualhost not correct", "/" + vhost.getName(), vhostSlice);
+ }
+
+ /**
+ * Parse the log message and return the slice according to the following:
+ * Given Example:
+ * con:1(guest@127.0.0.1/test)/ch:2/ex(amq.direct)/qu(myQueue)/rk(myQueue)
+ *
+ * Each item (except channel) is of the format <key>(<values>)
+ *
+ * So Given an ID to slice on:
+ * con:1 - Connection 1
+ * ex - exchange
+ * qu - queue
+ * rk - routing key
+ *
+ * @param sliceID the slice to locate
+ * @param message the message to search in
+ *
+ * @return the slice if found otherwise null is returned
+ */
+ protected String getSlice(String sliceID, String message)
+ {
+ int indexOfSlice = message.indexOf(sliceID + "(");
+
+ if (indexOfSlice == -1)
+ {
+ return null;
+ }
+
+ int endIndex = message.indexOf(')', indexOfSlice);
+
+ if (endIndex == -1)
+ {
+ return null;
+ }
+
+ return message.substring(indexOfSlice + 1 + sliceID.length(),
+ endIndex);
+ }
+
+ /**
+ * Test that when Logging occurs a single log statement is provided
+ *
+ * @throws ConfigurationException
+ */
+ public void testEnabled() throws ConfigurationException
+ {
+ List<Object> logs = performLog();
+
+ assertEquals("Log has to many messagse", 1, logs.size());
+
+ validateLogStatement(String.valueOf(logs.get(0)));
+ }
+
+ /**
+ * Call to the individiual tests to validate the message is formatted as
+ * expected
+ *
+ * @param message the message whos format needs validation
+ */
+ protected abstract void validateLogStatement(String message);
+
+ /**
+ * Ensure that when status-updates are off this does not perform logging
+ *
+ * @throws ConfigurationException
+ */
+ public void testDisabled() throws ConfigurationException
+ {
+ _config.addProperty("status-updates", "OFF");
+
+ List<Object> logs = performLog();
+
+ assertEquals("Log has to many messagse", 0, logs.size());
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
new file mode 100644
index 0000000000..845d02267f
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BindingLogSubjectTest extends AbstractTestLogSubject
+{
+
+ AMQQueue _queue;
+ AMQShortString _routingKey;
+ Exchange _exchange;
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+ // Configure items for subjectCreation
+ _routingKey = new AMQShortString("RoutingKey");
+ _exchange = _testVhost.getExchangeRegistry().getDefaultExchange();
+ _queue = new MockAMQQueue("BindingLogSubjectTest");
+ ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+
+ _subject = new BindingLogSubject(_routingKey, _exchange, _queue);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/ex(direct/<<default>>)/qu(BindingLogSubjectTest)/rk(RoutingKey)] <Log Message>
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ verifyVirtualHost(message, _testVhost);
+ verifyExchange(message, _exchange);
+ verifyQueue(message, _queue);
+ verifyRoutingKey(message, _routingKey);
+ }
+
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
new file mode 100644
index 0000000000..9d5cb70f4b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ChannelLogSubjectTest extends ConnectionLogSubjectTest
+{
+ private final int _channelID = 1;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ _session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ _session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore());
+
+ _subject = new ChannelLogSubject(channel);
+ }
+
+ /**
+ * MESSAGE [Blank][con:0(MockProtocolSessionUser@null/test)/ch:1] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ protected void validateLogStatement(String message)
+ {
+ // Use the ConnectionLogSubjectTest to vaildate that the connection
+ // section is ok
+ super.validateLogStatement(message);
+
+ // Finally check that the channel identifier is correctly added
+ assertTrue("Channel 1 identifier not found as part of Subject",
+ message.contains(")/ch:1]"));
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
new file mode 100644
index 0000000000..ff2d9b5e11
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ConnectionLogSubjectTest extends AbstractTestLogSubject
+{
+ AMQProtocolSession _session;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ _session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ _session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ _subject = new ConnectionLogSubject(_session);
+ }
+
+ /**
+ * MESSAGE [Blank][con:0(MockProtocolSessionUser@null/test)] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ protected void validateLogStatement(String message)
+ {
+ verifyConnection(_session.getSessionID(), "MockProtocolSessionUser", "null", "test", message);
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
new file mode 100644
index 0000000000..35df4c5976
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ExchangeLogSubjectTest extends AbstractTestLogSubject
+{
+ Exchange _exchange;
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _exchange = _testVhost.getExchangeRegistry().getDefaultExchange();
+ _subject = new ExchangeLogSubject(_exchange,_testVhost);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/ex(direct/<<default>>)] <Log Message>
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ verifyVirtualHost(message, _testVhost);
+ verifyExchange(message, _exchange);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
new file mode 100644
index 0000000000..7ef1f8d903
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class QueueLogSubjectTest extends AbstractTestLogSubject
+{
+
+ AMQQueue _queue;
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _queue = new MockAMQQueue("QueueLogSubjectTest");
+ ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+
+ _subject = new QueueLogSubject(_queue);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/qu(BindingLogSubjectTest)] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ System.err.println(message);
+ verifyVirtualHost(message, _testVhost);
+ verifyQueue(message, _queue);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
new file mode 100644
index 0000000000..0b0b0d78d1
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactory;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SubscriptionLogSubjectTest extends AbstractTestLogSubject
+{
+
+ AMQQueue _queue;
+ VirtualHost _testVhost;
+ private boolean _acks;
+ private FieldTable _filters;
+ private boolean _noLocal;
+ private int _channelID = 1;
+ Subscription _subscription;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _queue = new MockAMQQueue("QueueLogSubjectTest");
+ ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ AMQChannel channel = new AMQChannel(session, _channelID, session.getVirtualHost().getMessageStore());
+
+ session.addChannel(channel);
+
+ SubscriptionFactory factory = new SubscriptionFactoryImpl();
+
+ _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"),
+ _acks, _filters, _noLocal,
+ new LimitlessCreditManager());
+
+ _subscription.setQueue(_queue);
+
+ _subject = new SubscriptionLogSubject(_subscription);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][sub:0(qu(QueueLogSubjectTest))] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ String subscriptionSlice = getSlice("sub:"
+ + _subscription.getSubscriptionID(),
+ message);
+
+ assertNotNull("Unable to locate subscription 'sub:" +
+ _subscription.getSubscriptionID() + "'");
+
+ // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only
+ // Subject that nests () and so the simple parser of checking for the
+ // next ')' falls down.
+ verifyQueue(subscriptionSlice+")", _queue);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index d5db87350b..f09b03ab85 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -21,25 +21,22 @@
package org.apache.qpid.server.protocol;
import junit.framework.TestCase;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
+import java.security.Principal;
-/**
- * Test class to test MBean operations for AMQMinaProtocolSession.
- */
+/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class AMQProtocolSessionMBeanTest extends TestCase
{
/** Used for debugging. */
@@ -56,11 +53,11 @@ public class AMQProtocolSessionMBeanTest extends TestCase
int channelCount = _mbean.channels().size();
assertTrue(channelCount == 1);
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()),
- false,
- new AMQShortString("test"),
- true,
- _protocolSession.getVirtualHost(), null);
- AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
+ false,
+ new AMQShortString("test"),
+ true,
+ _protocolSession.getVirtualHost(), null);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -114,8 +111,16 @@ public class AMQProtocolSessionMBeanTest extends TestCase
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_protocolSession =
- new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
- null);
+ new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
+ null);
+ // Need to authenticate session for it to work, (well for logging to work)
+ _protocolSession.setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "AMQProtocolSessionMBeanTestUser";
+ }
+ });
_protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
_channel = new AMQChannel(_protocolSession, 1, _messageStore);
_protocolSession.addChannel(_channel);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index da35ddc594..49c5f8a14b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.net.SocketAddress;
public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index 1bdabf345b..9597c1319a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -30,6 +30,8 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import java.security.Principal;
+
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
{
@@ -40,6 +42,16 @@ public class MaxChannelsTest extends TestCase
{
_session = new AMQMinaProtocolSession(new TestIoSession(), _appRegistry
.getVirtualHostRegistry(), new AMQCodecFactory(true), null);
+
+ // Need to authenticate session for it to work, (well for logging to work)
+ _session.setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "AMQProtocolSessionMBeanTestUser";
+ }
+ });
+
_session.setVirtualHost(_appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
// check the channel count is correct
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index d7bd297998..bfbb3e19e5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -46,6 +46,7 @@ public class MockAMQQueue implements AMQQueue
{
private boolean _deleted = false;
private AMQShortString _name;
+ private VirtualHost _virtualhost;
private PrincipalHolder _principalHolder;
@@ -79,9 +80,14 @@ public class MockAMQQueue implements AMQQueue
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void setVirtualHost(VirtualHost virtualhost)
+ {
+ _virtualhost = virtualhost;
+ }
+
public VirtualHost getVirtualHost()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _virtualhost;
}
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index ad6c95db8e..fe79c40bb9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -34,7 +34,9 @@ import org.apache.qpid.transport.Sender;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.security.Principal;
+import java.net.SocketAddress;
/**
* A protocol session that can be used for testing purposes.
@@ -45,11 +47,21 @@ public class MockProtocolSession implements AMQProtocolSession
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
+ private final long _sessionID = idGenerator.getAndIncrement();
+ private VirtualHost _virtualHost;
+
public MockProtocolSession(MessageStore messageStore)
{
_messageStore = messageStore;
}
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
}
@@ -158,12 +170,12 @@ public class MockProtocolSession implements AMQProtocolSession
public VirtualHost getVirtualHost()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _virtualHost;
}
public void setVirtualHost(VirtualHost virtualHost)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _virtualHost = virtualHost;
}
public void addSessionCloseTask(Task task)
@@ -188,6 +200,18 @@ public class MockProtocolSession implements AMQProtocolSession
public Principal getPrincipal()
{
+ return new Principal()
+ {
+ public String getName()
+ {
+ return "MockProtocolSessionUser";
+ }
+ };
+
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index 83bcd03177..f56d562354 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class FirewallPluginTest extends TestCase
{
@@ -89,11 +90,13 @@ public class FirewallPluginTest extends TestCase
public void setUp() throws Exception
{
_store = new TestableMemoryMessageStore();
- PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
TestIoSession iosession = new TestIoSession();
iosession.setAddress("127.0.0.1");
- VirtualHostRegistry virtualHostRegistry = null;
+
+ // Retreive VirtualHost from the Registry
+ VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
+ _virtualHost = virtualHostRegistry.getVirtualHost("test");
+
AMQCodecFactory codecFactory = new AMQCodecFactory(true);
_session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index c7ea2067a6..94f7ff33a6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -21,10 +21,6 @@ package org.apache.qpid.server.subscription;
*
*/
-import java.util.ArrayList;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
@@ -32,6 +28,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
public class MockSubscription implements Subscription
{
@@ -44,6 +45,10 @@ public class MockSubscription implements Subscription
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
public void close()
{
_closed = true;
@@ -66,7 +71,12 @@ public class MockSubscription implements Subscription
public AMQShortString getConsumerTag()
{
- return tag ;
+ return tag;
+ }
+
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
}
public AMQQueue.Context getQueueContext()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 101c33043d..585ed9a538 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -21,30 +21,31 @@
package org.apache.qpid.server.util;
import junit.framework.TestCase;
-
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.MockChannel;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
+
+import java.security.Principal;
public class InternalBrokerBaseCase extends TestCase
{
@@ -64,7 +65,7 @@ public class InternalBrokerBaseCase extends TestCase
configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
_registry = new TestApplicationRegistry(new ServerConfiguration(configuration));
ApplicationRegistry.initialise(_registry);
- _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+ _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
@@ -80,6 +81,14 @@ public class InternalBrokerBaseCase extends TestCase
_session = new InternalTestProtocolSession();
+ _session.setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "InternalBrokerBaseCaseUser";
+ }
+ });
+
_session.setVirtualHost(_virtualHost);
_channel = new MockChannel(_session, 1, _messageStore);
@@ -176,7 +185,7 @@ public class InternalBrokerBaseCase extends TestCase
for (int count = 0; count < messages; count++)
{
- channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+ channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
//Set the body size
ContentHeaderBody _headerBody = new ContentHeaderBody();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index c6ecac6a01..84bee7984b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -87,7 +87,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
_messageStore = new TestableMemoryMessageStore();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index fa5e3da4db..709f2a478e 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -169,6 +169,7 @@ tools.test.libs=${client.test.libs}
testkit.test.libs=${test.libs}
management-client.test.libs=${muse.libs} ${test.libs} ${log4j} ${javassist} ${geronimo-servlet} ${commons-pool}
management-console.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs}
+management-agent.test.libs=${junit}
management-eclipse-plugin.test.libs=${systests.libs}
broker-plugins.test.libs=${test.libs}
management-tools-qpid-cli.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
index 63222b50db..515c849290 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
@@ -20,7 +20,18 @@
*/
package org.apache.qpid.util;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
/**
* FileUtils provides some simple helper methods for working with files. It follows the convention of wrapping all
@@ -46,7 +57,8 @@ public class FileUtils
{
BufferedInputStream is = null;
- try{
+ try
+ {
try
{
is = new BufferedInputStream(new FileInputStream(filename));
@@ -57,7 +69,9 @@ public class FileUtils
}
return readStreamAsString(is);
- }finally {
+ }
+ finally
+ {
if (is != null)
{
try
@@ -210,68 +224,69 @@ public class FileUtils
/*
* Deletes a given file
*/
- public static boolean deleteFile(String filePath)
- {
- return delete(new File(filePath), false);
- }
+ public static boolean deleteFile(String filePath)
+ {
+ return delete(new File(filePath), false);
+ }
/*
* Deletes a given empty directory
*/
- public static boolean deleteDirectory(String directoryPath)
- {
- File directory = new File(directoryPath);
-
- if (directory.isDirectory())
- {
- if (directory.listFiles().length == 0)
- {
- return delete(directory, true);
- }
- }
-
- return false;
- }
-
- /**
- * Delete a given file/directory,
- * A directory will always require the recursive flag to be set.
- * if a directory is specified and recursive set then delete the whole tree
- * @param file the File object to start at
- * @param recursive boolean to recurse if a directory is specified.
- * @return <code>true</code> if and only if the file or directory is
- * successfully deleted; <code>false</code> otherwise
- */
- public static boolean delete(File file, boolean recursive)
- {
- boolean success = true;
-
- if (file.isDirectory())
- {
- if (recursive)
- {
- File[] files = file.listFiles();
-
- // This can occur if the file is deleted outside the JVM
- if (files == null)
- {
- return false;
- }
-
- for (int i = 0; i < files.length; i++)
- {
- success = delete(files[i], true) && success;
- }
-
- return success && file.delete();
- }
-
- return false;
- }
-
- return file.delete();
- }
+ public static boolean deleteDirectory(String directoryPath)
+ {
+ File directory = new File(directoryPath);
+ if (directory.isDirectory())
+ {
+ if (directory.listFiles().length == 0)
+ {
+ return delete(directory, true);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Delete a given file/directory,
+ * A directory will always require the recursive flag to be set.
+ * if a directory is specified and recursive set then delete the whole tree
+ *
+ * @param file the File object to start at
+ * @param recursive boolean to recurse if a directory is specified.
+ *
+ * @return <code>true</code> if and only if the file or directory is
+ * successfully deleted; <code>false</code> otherwise
+ */
+ public static boolean delete(File file, boolean recursive)
+ {
+ boolean success = true;
+
+ if (file.isDirectory())
+ {
+ if (recursive)
+ {
+ File[] files = file.listFiles();
+
+ // This can occur if the file is deleted outside the JVM
+ if (files == null)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < files.length; i++)
+ {
+ success = delete(files[i], true) && success;
+ }
+
+ return success && file.delete();
+ }
+
+ return false;
+ }
+
+ return file.delete();
+ }
public static class UnableToCopyException extends Exception
{
@@ -294,7 +309,6 @@ public class FileUtils
throw new IllegalArgumentException("Unable to copy '" + source.toString() + "' to '" + dst + "' a file with same name exists.");
}
-
if (source.isFile())
{
copy(source, dst);
@@ -303,22 +317,48 @@ public class FileUtils
//else we have a source directory
if (!dst.isDirectory() && !dst.mkdir())
{
- throw new UnableToCopyException("Unable to create destination directory");
+ throw new UnableToCopyException("Unable to create destination directory");
}
-
for (File file : source.listFiles())
{
- if (file.isFile())
- {
- copy(file, new File(dst.toString() + File.separator + file.getName()));
- }
- else
- {
- copyRecursive(file, new File(dst + File.separator + file.getName()));
- }
+ if (file.isFile())
+ {
+ copy(file, new File(dst.toString() + File.separator + file.getName()));
+ }
+ else
+ {
+ copyRecursive(file, new File(dst + File.separator + file.getName()));
+ }
}
+ }
+
+ /**
+ * Checks the specified file for instances of the search string.
+ *
+ * @param file the file to search
+ * @param search the search String
+ *
+ * @throws java.io.IOException
+ * @return the list of matching entries
+ */
+ public static List<String> searchFile(File file, String search)
+ throws IOException
+ {
+
+ List<String> results = new LinkedList<String>();
+
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ while (reader.ready())
+ {
+ String line = reader.readLine();
+ if (line.contains(search))
+ {
+ results.add(line);
+ }
+ }
+ return results;
}
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java b/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
index 94e7e20a86..7eba5f092e 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
@@ -22,11 +22,12 @@ package org.apache.qpid.util;
import junit.framework.TestCase;
-import java.io.File;
-import java.io.IOException;
import java.io.BufferedWriter;
-import java.io.FileWriter;
+import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
public class FileUtilsTest extends TestCase
{
@@ -47,7 +48,7 @@ public class FileUtilsTest extends TestCase
//Create initial file
File test = createTestFile(fileName, TEST_DATA);
-
+
try
{
//Check number of files before copy
@@ -137,7 +138,6 @@ public class FileUtilsTest extends TestCase
testSubDir.deleteOnExit();
testDir.deleteOnExit();
-
//Perform Copy
File copyDir = new File(testDir.toString() + COPY);
try
@@ -282,7 +282,7 @@ public class FileUtilsTest extends TestCase
public void testDeleteNonExistentFile()
{
- File test = new File("FileUtilsTest-testDelete-"+System.currentTimeMillis());
+ File test = new File("FileUtilsTest-testDelete-" + System.currentTimeMillis());
assertTrue("File exists", !test.exists());
assertFalse("File is a directory", test.isDirectory());
@@ -303,7 +303,6 @@ public class FileUtilsTest extends TestCase
}
}
-
/**
* Given two lists of File arrays ensure they are the same length and all entries in Before are in After
*
@@ -543,4 +542,71 @@ public class FileUtilsTest extends TestCase
}
}
+ public static final String SEARCH_STRING = "testSearch";
+
+ /**
+ * Test searchFile(File file, String search) will find a match when it
+ * exists.
+ *
+ * @throws java.io.IOException if unable to perform test setup
+ */
+ public void testSearchSucceed() throws IOException
+ {
+ File _logfile = File.createTempFile("FileUtilsTest-testSearchSucceed", ".out");
+
+ prepareFileForSearchTest(_logfile);
+
+ List<String> results = FileUtils.searchFile(_logfile, SEARCH_STRING);
+
+ assertNotNull("Null result set returned", results);
+
+ assertEquals("Results do not contain expected count", 1, results.size());
+ }
+
+ /**
+ * Test searchFile(File file, String search) will not find a match when the
+ * test string does not exist.
+ *
+ * @throws java.io.IOException if unable to perform test setup
+ */
+ public void testSearchFail() throws IOException
+ {
+ File _logfile = File.createTempFile("FileUtilsTest-testSearchFail", ".out");
+
+ prepareFileForSearchTest(_logfile);
+
+ List<String> results = FileUtils.searchFile(_logfile, "Hello");
+
+ assertNotNull("Null result set returned", results);
+
+ //Validate we only got one message
+ if (results.size() > 0)
+ {
+ System.err.println("Unexpected messages");
+
+ for (String msg : results)
+ {
+ System.err.println(msg);
+ }
+ }
+
+ assertEquals("Results contains data when it was not expected",
+ 0, results.size());
+ }
+
+ /**
+ * Write the SEARCH_STRING in to the given file.
+ *
+ * @param logfile The file to write the SEARCH_STRING into
+ *
+ * @throws IOException if an error occurs
+ */
+ private void prepareFileForSearchTest(File logfile) throws IOException
+ {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(logfile));
+ writer.append(SEARCH_STRING);
+ writer.flush();
+ writer.close();
+ }
+
}
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 5d0c0a9541..9bf0270bd6 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -373,7 +373,7 @@
<target name="libs-release" description="copy dependencies into module release">
<!-- Copy the module dependencies -->
- <copylist todir="${module.release}" dir="${project.root}" files="${module.libs}"/>
+ <copylist todir="${module.release}/lib" dir="${project.root}" files="${module.libs}"/>
<!-- Copy the jar for this module -->
<copy todir="${module.release}/lib" failonerror="true">
<fileset file="${module.jar}"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 737ed2322f..2216758c2e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -152,6 +152,11 @@ public class SubscriptionTestHelper implements Subscription
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public long getSubscriptionID()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isActive()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/qpid/python/mllib/__init__.py b/qpid/python/mllib/__init__.py
index 39e9363614..9aa1e56e66 100644
--- a/qpid/python/mllib/__init__.py
+++ b/qpid/python/mllib/__init__.py
@@ -24,6 +24,8 @@ both SGML and XML.
import os, dom, transforms, parsers, sys
import xml.sax, types
+from xml.sax.handler import ErrorHandler
+from xml.sax.xmlreader import InputSource
from cStringIO import StringIO
def transform(node, *args):
@@ -49,15 +51,33 @@ def sgml_parse(source):
p.close()
return p.parser.tree
-def xml_parse(filename):
+class Resolver:
+
+ def __init__(self, path):
+ self.path = path
+
+ def resolveEntity(self, publicId, systemId):
+ for p in self.path:
+ fname = os.path.join(p, systemId)
+ if os.path.exists(fname):
+ source = InputSource(systemId)
+ source.setByteStream(open(fname))
+ return source
+ return InputSource(systemId)
+
+def xml_parse(filename, path=()):
if sys.version_info[0:2] == (2,3):
# XXX: this is for older versions of python
- source = "file://%s" % os.path.abspath(filename)
+ source = "file://%s" % os.path.abspath(filename)
else:
source = filename
- p = parsers.XMLParser()
- xml.sax.parse(source, p)
- return p.parser.tree
+ h = parsers.XMLParser()
+ p = xml.sax.make_parser()
+ p.setContentHandler(h)
+ p.setErrorHandler(ErrorHandler())
+ p.setEntityResolver(Resolver(path))
+ p.parse(source)
+ return h.parser.tree
def sexp(node):
s = transforms.Sexp()
diff --git a/qpid/specs/amqp.0-10-qpid-errata.xml b/qpid/specs/amqp.0-10-qpid-errata.xml
index 5a554b0ab7..365928ea4e 100644
--- a/qpid/specs/amqp.0-10-qpid-errata.xml
+++ b/qpid/specs/amqp.0-10-qpid-errata.xml
@@ -121,7 +121,7 @@
-->
-<!--<!DOCTYPE amqp SYSTEM "amqp.0-10.dtd">-->
+<!DOCTYPE amqp SYSTEM "amqp.0-10.dtd">
<amqp xmlns="http://www.amqp.org/schema/amqp.xsd"
major="0" minor="10" port="5672">