diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-05 10:11:48 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-05 10:11:48 +0000 |
commit | b814e73fa07ec35dea45d58f9a2deaa44630edd9 (patch) | |
tree | 0adb70432e7617d238998034a59963795100f6c3 | |
parent | 2a40f697eaef97b68619eaf7491f1df7084a8754 (diff) | |
download | qpid-python-b814e73fa07ec35dea45d58f9a2deaa44630edd9.tar.gz |
Merged from trunk up to r796653
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821735 13f79535-47bb-0310-9956-ffa450edef68
71 files changed, 3757 insertions, 187 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index d77bd9b7ca..37a9e9b4af 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -228,6 +228,8 @@ void SemanticState::record(const DeliveryRecord& delivery) unacked.push_back(delivery); } +const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); + SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, @@ -255,7 +257,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, msgCredit(0), byteCredit(0), notifyEnabled(true), - syncFrequency(_arguments.getAsInt("qpid.sync_frequency")), + syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), deliveryCount(0) {} diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index e52274597e..6f1d42249d 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -57,6 +57,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : dtxImpl(s) {} +static const std::string TRUE("true"); +static const std::string FALSE("false"); void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, const string& alternateExchange, @@ -67,8 +69,8 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const std::map<acl::Property, std::string> params; params.insert(make_pair(acl::PROP_TYPE, type)); params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" : "false") )); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" : "false"))); + params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? TRUE : FALSE) )); + params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? TRUE : FALSE))); if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId())); } @@ -325,10 +327,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (acl) { std::map<acl::Property, std::string> params; params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" : "false") )); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" : "false"))); - params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? "true" : "false"))); - params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? "true" : "false"))); + params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? TRUE : FALSE) )); + params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? TRUE : FALSE))); + params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? TRUE : FALSE))); + params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? TRUE : FALSE))); if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); } diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index f5e79012d9..0878ae94b9 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -29,6 +29,11 @@ #include <unistd.h> +// This is a macro instead of a function because we don't want to +// evaluate the MSG argument unless there is an error. +#define CPG_CHECK(RESULT, MSG) \ + if ((RESULT) != CPG_OK) throw Exception(errorStr((RESULT), (MSG))) + namespace qpid { namespace cluster { @@ -36,7 +41,7 @@ using namespace std; Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { void* cpg=0; - check(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); + CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); if (!cpg) throw Exception("Cannot get CPG instance."); return reinterpret_cast<Cpg*>(cpg); } @@ -66,7 +71,7 @@ void Cpg::globalConfigChange( int Cpg::getFd() { int fd; - check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor"); + CPG_CHECK(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor"); return fd; } @@ -84,8 +89,8 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow sys::sleep(5); err = cpg_initialize(&handle, &callbacks); } - check(err, "Failed to initialize CPG."); - check(cpg_context_set(handle, this), "Cannot set CPG context"); + CPG_CHECK(err, "Failed to initialize CPG."); + CPG_CHECK(cpg_context_set(handle, this), "Cannot set CPG context"); // Note: CPG is currently unix-specific. If CPG is ported to // windows then this needs to be refactored into // qpid::sys::<platform> @@ -102,11 +107,11 @@ Cpg::~Cpg() { void Cpg::join(const std::string& name) { group = name; - check(cpg_join(handle, &group), cantJoinMsg(group)); + CPG_CHECK(cpg_join(handle, &group), cantJoinMsg(group)); } void Cpg::leave() { - check(cpg_leave(handle, &group), cantLeaveMsg(group)); + CPG_CHECK(cpg_leave(handle, &group), cantLeaveMsg(group)); } @@ -115,14 +120,14 @@ void Cpg::leave() { bool Cpg::mcast(const iovec* iov, int iovLen) { // Check for flow control cpg_flow_control_state_t flowState; - check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); + CPG_CHECK(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); if (flowState == CPG_FLOW_CONTROL_ENABLED) return false; cpg_error_t result; do { result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); - if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); + if (result != CPG_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group)); } while(result == CPG_ERR_TRY_AGAIN); return true; } @@ -131,20 +136,20 @@ void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); isShutdown=true; - check(cpg_finalize(handle), "Error in shutdown of CPG"); + CPG_CHECK(cpg_finalize(handle), "Error in shutdown of CPG"); } } void Cpg::dispatchOne() { - check(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch"); + CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch"); } void Cpg::dispatchAll() { - check(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch"); + CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch"); } void Cpg::dispatchBlocking() { - check(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch"); + CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch"); } string Cpg::errorStr(cpg_error_t err, const std::string& msg) { @@ -184,7 +189,7 @@ std::string Cpg::cantMcastMsg(const Name& group) { MemberId Cpg::self() const { unsigned int nodeid; - check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); + CPG_CHECK(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); return MemberId(nodeid, getpid()); } diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index ac27a09ae6..624721b560 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -120,10 +120,6 @@ class Cpg : public sys::IOHandle { static std::string cantLeaveMsg(const Name&); static std::string cantMcastMsg(const Name&); - static void check(cpg_error_t result, const std::string& msg) { - if (result != CPG_OK) throw Exception(errorStr(result, msg)); - } - static Cpg* cpgFromHandle(cpg_handle_t); static void globalDeliver( diff --git a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp index e2cbff3908..062458ae5f 100755 --- a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp +++ b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp @@ -19,9 +19,6 @@ #include "qpid/sys/PipeHandle.h" #include "qpid/sys/windows/check.h" -#include <io.h> -#include <fcntl.h> -#include <errno.h> #include <winsock2.h> namespace qpid { @@ -29,14 +26,53 @@ namespace sys { PipeHandle::PipeHandle(bool nonBlocking) { - int pair[2]; - pair[0] = pair[1] = -1; + SOCKET listener, pair[2]; + struct sockaddr_in addr; + int err; + int addrlen = sizeof(addr); + pair[0] = pair[1] = INVALID_SOCKET; + if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); - if (_pipe(pair, 128, O_BINARY) == -1) - throw qpid::Exception(QPID_MSG("Creation of pipe failed")); + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; - writeFd = pair[0]; - readFd = pair[1]; + err = bind(listener, (const struct sockaddr*) &addr, sizeof(addr)); + if (err == SOCKET_ERROR) { + err = WSAGetLastError(); + closesocket(listener); + throw QPID_WINDOWS_ERROR(err); + } + + err = getsockname(listener, (struct sockaddr*) &addr, &addrlen); + if (err == SOCKET_ERROR) { + err = WSAGetLastError(); + closesocket(listener); + throw QPID_WINDOWS_ERROR(err); + } + + try { + if (listen(listener, 1) == SOCKET_ERROR) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if ((pair[0] = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if (connect(pair[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if ((pair[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + + closesocket(listener); + writeFd = pair[0]; + readFd = pair[1]; + } + catch (...) { + closesocket(listener); + if (pair[0] != INVALID_SOCKET) + closesocket(pair[0]); + throw; + } // Set the socket to non-blocking if (nonBlocking) { @@ -46,16 +82,16 @@ PipeHandle::PipeHandle(bool nonBlocking) { } PipeHandle::~PipeHandle() { - close(readFd); - close(writeFd); + closesocket(readFd); + closesocket(writeFd); } int PipeHandle::read(void* buf, size_t bufSize) { - return ::read(readFd, buf, bufSize); + return ::recv(readFd, (char *)buf, bufSize, 0); } int PipeHandle::write(const void* buf, size_t bufSize) { - return ::write(writeFd, buf, bufSize); + return ::send(writeFd, (const char *)buf, bufSize, 0); } int PipeHandle::getReadHandle() { diff --git a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp index 3e2fcb1517..ea53fc199c 100755 --- a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp +++ b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp @@ -181,18 +181,21 @@ uint32_t SystemInfo::getParentProcessId() std::string SystemInfo::getProcessName() { + std::string name; + // Only want info for the current process, so ask for something specific. // The module info won't be used here but it keeps the snapshot limited to // the current process so a search through all processes is not needed. HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0); if (snap == INVALID_HANDLE_VALUE) - return 0; + return name; PROCESSENTRY32 entry; entry.dwSize = sizeof(entry); if (!Process32First(snap, &entry)) entry.szExeFile[0] = '\0'; CloseHandle(snap); - return std::string(entry.szExeFile); + name = entry.szExeFile; + return name; } }} // namespace qpid::sys diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index e0d325a5b0..fc16b75e1a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator"); + envVarMap.put("QPID_STATUS-UPDATES", "status-updates"); } public ServerConfiguration(File configurationURL) throws ConfigurationException @@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler } return conf; } - + + public boolean getStatusEnabled() + { + return getConfig().getBoolean("status-updates", true); + } + // Our configuration class needs to make the interpolate method // public so it can be called below from the config method. private static class MyConfiguration extends CompositeConfiguration @@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public boolean getStatusUpdates() + { + // Retrieve the setting from configuration but default to on. + String value = getConfig().getString("status-updates", "on"); + + return value.equalsIgnoreCase("on"); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 054674aed4..5d7adc6371 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore() - ); + final AMQChannel channel = new AMQChannel(session,channelId, + virtualHost.getMessageStore()); + + + session.addChannel(channel); ChannelOpenOkBody response; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java new file mode 100644 index 0000000000..e9cc7449cd --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +public class BrokerMessages +{ + + public static LogMessage BRK_1001(String version, String build) + { + return new LogMessage() + { + + }; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java new file mode 100644 index 0000000000..203a5d160d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * LogActor the entity that is stored as in a ThreadLocal and used to perform logging. + * + * The actor is responsible for formatting its display name for the log entry. + * + * The actor performs the requested logging. + */ +public interface LogActor +{ + /** + * Logs the specified LogMessage about the LogSubject + * + * Currently logging has a global setting however this will later be revised and + * as such the LogActor will need to take into consideration any new configuration + * as a means of enabling the logging of LogActors and LogSubjects. + * + * @param subject The subject that is being logged + * @param message The message to log + */ + public void message(LogSubject subject, LogMessage message); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java new file mode 100644 index 0000000000..5c112ff100 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +public interface LogMessage +{ + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java new file mode 100644 index 0000000000..e53ef364bf --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * Each LogSubject that wishes to be logged will implement this to provide their + * own display representation. + * + * The display representation is retrieved through the toString() method. + */ +public interface LogSubject +{ + /** + * Logs the message as provided by String.valueOf(message). + * + * @returns String the display representation of this LogSubject + */ + public String toString(); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java new file mode 100644 index 0000000000..7d515f3263 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * A RawMessage Logger takes the given String and any Throwable and writes the + * data to its resource. + */ +public interface RawMessageLogger +{ + + /** + * Log the given message. + * + * @param message String to log. + */ + public void rawMessage(String message); + + /** + * Log the message and formatted stack trace for any Throwable. + * + * @param message String to log. + * @param throwable Throwable for which to provide stack trace. + */ + public void rawMessage(String message, Throwable throwable); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java new file mode 100644 index 0000000000..cd7992faa7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * The RootMessageLogger is used by the LogActors to query if + * logging is enabled for the requested message and to provide the actual + * message that should be logged. + */ +public interface RootMessageLogger +{ + /** + * Determine if the LogSubject and the LogActor should be + * generating log messages. + * + * @param subject The subject of this log request + * @param actor The actor requesting the logging + * @return boolean true if the message should be logged. + */ + boolean isMessageEnabled(LogActor actor, LogSubject subject); + + + /** + * Log the raw message to the configured logger. + * + * @param message The message to log + */ + public void rawMessage(String message); + + /** + * Log the raw message to the configured logger. + * Along with a formated stack trace from the Throwable. + * + * @param message The message to log + * @param throwable Optional Throwable that should provide stact trace + */ + void rawMessage(String message, Throwable throwable); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java new file mode 100644 index 0000000000..9270c316b6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +import org.apache.qpid.server.configuration.ServerConfiguration; + +public class RootMessageLoggerImpl implements RootMessageLogger +{ + private boolean _enabled; + + RawMessageLogger _rawLogger; + private static final String MESSAGE = "MESSAGE "; + + public RootMessageLoggerImpl(ServerConfiguration configuration, RawMessageLogger rawLogger) + { + _enabled = configuration.getStatusUpdates(); + _rawLogger = rawLogger; + } + + public boolean isMessageEnabled(LogActor actor, LogSubject subject) + { + return _enabled; + } + + public void rawMessage(String message) + { + _rawLogger.rawMessage(MESSAGE + message); + } + + public void rawMessage(String message, Throwable throwable) + { + _rawLogger.rawMessage(MESSAGE + message, throwable); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java new file mode 100644 index 0000000000..f4e2793e8b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.text.MessageFormat; + +/** + * An AMQPChannelActor represtents a connection through the AMQP port with an + * associated Channel. + * + * <p/> + * This is responsible for correctly formatting the LogActor String in the log + * <p/> + * [con:1(user@127.0.0.1/)/ch:1] + * <p/> + * To do this it requires access to the IO Layers as well as a Channel + */ +public class AMQPChannelActor extends AbstractActor +{ + + /** + * Create a new ChannelActor + * + * @param channel The Channel for this LogActor + * @param rootLogger The root Logger that this LogActor should use + */ + public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger) + { + super(rootLogger); + + AMQProtocolSession session = channel.getProtocolSession(); + + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ChannelLogSubject.CHANNEL_FORMAT : + * con:{0}({1}@{2}/{3})/ch:{4} + * + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT, + session.getSessionID(), + session.getPrincipal().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName(), + channel.getChannelId()) + + "] "; + } +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java new file mode 100644 index 0000000000..d459fc0f06 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +import java.text.MessageFormat; + +/** + * An AMQPConnectionActor represtents a connectionthrough the AMQP port. + * <p/> + * This is responsible for correctly formatting the LogActor String in the log + * <p/> + * [ con:1(user@127.0.0.1/) ] + * <p/> + * To do this it requires access to the IO Layers. + */ +public class AMQPConnectionActor extends AbstractActor +{ + /** + * 0 - Connection ID + * 1 - Remote Address + */ + public static String SOCKET_FORMAT = "con:{0}({1})"; + + /** + * LOG FORMAT for the ConnectionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + */ + public static final String USER_FORMAT = "con:{0}({1}@{2})"; + + public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "[" + MessageFormat.format(SOCKET_FORMAT, + session.getSessionID(), + session.getRemoteAddress()) + + + "] "; + } + + /** + * Call when the connection has been authorized so that the logString + * can be updated with the new user identity. + * + * @param session the authorized session + */ + public void connectionAuthorized(AMQProtocolSession session) + { + _logString = "[" + MessageFormat.format(USER_FORMAT, + session.getSessionID(), + session.getPrincipal().getName(), + session.getRemoteAddress()) + + "] "; + + } + + /** + * Called once the user has been authenticated and they are now selecting + * the virtual host they wish to use. + * + * @param session the session that now has a virtualhost associated with it. + */ + public void virtualHostSelected(AMQProtocolSession session) + { + + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ConnectionLogSubject.CONNECTION_FORMAT : + * con:{0}({1}@{2}/{3}) + * + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT, + session.getSessionID(), + session.getPrincipal().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName()) + + "] "; + + } +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java new file mode 100644 index 0000000000..95f2dc9ff6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; + +public abstract class AbstractActor implements LogActor +{ + protected String _logString; + protected RootMessageLogger _rootLogger; + + public AbstractActor(RootMessageLogger rootLogger) + { + _rootLogger = rootLogger; + } + + public void message(LogSubject subject, LogMessage message) + { + if (_rootLogger.isMessageEnabled(this, subject)) + { + _rootLogger.rawMessage(_logString + String.valueOf(subject) + message); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java new file mode 100644 index 0000000000..221e57eebb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.LogActor; + +import java.util.LinkedList; +import java.util.Deque; + +public class CurrentActor +{ + private static final ThreadLocal<Deque<LogActor>> _currentActor = new ThreadLocal<Deque<LogActor>>() + { + protected Deque<LogActor> initialValue() + { + return new LinkedList<LogActor>(); + } + }; + + public static void set(LogActor actor) + { + Deque<LogActor> stack = _currentActor.get(); + stack.addFirst(actor); + } + + public static void remove() + { + Deque<LogActor> stack = _currentActor.get(); + stack.remove(); + } + + public static LogActor get() + { + return _currentActor.get().peek(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java new file mode 100644 index 0000000000..58d55a13bb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; + +import java.text.MessageFormat; +import java.security.Principal; + +public class ManagementActor extends AbstractActor +{ + + /** + * LOG FORMAT for the ManagementActor, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + */ + public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})"; + + /** + * //todo Correct interface to provide connection details + * @param user + * @param rootLogger The RootLogger to use for this Actor + */ + public ManagementActor(Principal user, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT, + "<MNG:ConnectionID>", + user.getName(), + "<MNG:RemoteAddress>") + + "] "; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java new file mode 100644 index 0000000000..3774155626 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.rawloggers; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.RawMessageLogger; + +public class Log4jMessageLogger implements RawMessageLogger +{ + public static final String DEFAULT_LEVEL = "INFO"; + public static final String DEFAULT_LOGGER = "qpid.message"; + private Level _level; + private Logger _rawMessageLogger; + + public Log4jMessageLogger() + { + this(DEFAULT_LEVEL, DEFAULT_LOGGER); + } + + public Log4jMessageLogger(String level, String logger) + { + _level = Level.toLevel(level); + + _rawMessageLogger = Logger.getLogger(logger); + } + + public void rawMessage(String message) + { + rawMessage(message, null); + } + + public void rawMessage(String message, Throwable throwable) + { + _rawMessageLogger.log(_level, message, throwable); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java new file mode 100644 index 0000000000..4fb5bdcc93 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.logging.LogSubject; + +import java.text.MessageFormat; + +/** + * The LogSubjects all have a similar requriement to format their output and + * provide the String value. + * + * This Abstract LogSubject provides this basic functionality, allowing the + * actual LogSubjects to provide their formating and data. + */ +public abstract class AbstractLogSubject implements LogSubject +{ + /** + * The logString that will be returned via toString + */ + protected String logString; + + /** + * Set the toString logging of this LogSubject. Based on a format provided + * by format and the var args. + * @param format The Message to format + * @param args The values to put in to the message. + */ + protected void setLogStringWithFormat(String format, Object... args) + { + logString = "[" + MessageFormat.format(format, args) + "] "; + } + + /** + * ToString is how the Logging infrastructure will get the text for this + * LogSubject + * + * @return String representing this LogSubject + */ + @Override + public String toString() + { + return logString; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java new file mode 100644 index 0000000000..fd171fea5a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; + +public class BindingLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ChannelLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Exchange Type + * 2 - Exchange Name + * 3 - Queue Name + * 4 - Binding RoutingKey + */ + protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})"; + + /** + * Create a BindingLogSubject that Logs in the following format. + * + * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ] + * + * @param routingKey + * @param exchange + * @param queue + */ + public BindingLogSubject(AMQShortString routingKey, Exchange exchange, + AMQQueue queue) + { + setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(), + exchange.getType(), + exchange.getName(), + queue.getName(), + routingKey); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java new file mode 100644 index 0000000000..03afd0b772 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +public class ChannelLogSubject extends AbstractLogSubject +{ + /** + * LOG FORMAT for the ChannelLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + * 4 - Channel ID + */ + public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT + + "/ch:{4}"; + + public ChannelLogSubject(AMQChannel channel) + { + AMQProtocolSession session = channel.getProtocolSession(); + + // Provide the value for the 4th replacement. + setLogStringWithFormat(CHANNEL_FORMAT, + session.getSessionID(), + session.getPrincipal().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName(), + channel.getChannelId()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java new file mode 100644 index 0000000000..65d65a24d2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.protocol.AMQProtocolSession; + +/** The Connection LogSubject */ +public class ConnectionLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ConnectionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})"; + + public ConnectionLogSubject(AMQProtocolSession session) + { + setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(), + session.getPrincipal().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java new file mode 100644 index 0000000000..21e5f5e4ce --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ExchangeLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ExchangeLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Exchange Type + * 2 - Exchange Name + */ + protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})"; + + /** Create an ExchangeLogSubject that Logs in the following format. */ + public ExchangeLogSubject(Exchange exchange, VirtualHost vhost) + { + setLogStringWithFormat(BINDING_FORMAT, vhost.getName(), + exchange.getType(), exchange.getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java new file mode 100644 index 0000000000..89f31ef477 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.queue.AMQQueue; + +public class QueueLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ExchangeLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost name + * 1 - queue name + */ + protected static String BINDING_FORMAT = "vh(/{0})/qu({1})"; + + /** Create an QueueLogSubject that Logs in the following format. */ + public QueueLogSubject(AMQQueue queue) + { + setLogStringWithFormat(BINDING_FORMAT, + queue.getVirtualHost().getName(), + queue.getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java new file mode 100644 index 0000000000..b68ef2e9a9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.subscription.Subscription; + +public class SubscriptionLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the SubscriptionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Subscription ID + * 1 - queue name + */ + protected static String BINDING_FORMAT = "sub:{0}(qu({1}))"; + + /** + * Create an QueueLogSubject that Logs in the following format. + * + * @param subscription + */ + public SubscriptionLogSubject(Subscription subscription) + { + + setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(), + subscription.getQueue().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 2db16ef751..5f1615351b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -21,26 +21,39 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; - +import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; -import org.apache.mina.common.CloseFuture; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender; import javax.management.JMException; import javax.security.sasl.SaslServer; - import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.Principal; @@ -64,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { @@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private static final AtomicLong idGenerator = new AtomicLong(0); + // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; @@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; private org.apache.mina.common.WriteFuture _lastWriteFuture; + // Create a simple ID that increments for ever new Session + private final long _sessionID = idGenerator.getAndIncrement(); + + private AMQPConnectionActor _actor; + public ManagedObject getManagedObject() { return _managedObject; @@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _codecFactory = codecFactory; + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + try { IoServiceConfig config = session.getServiceConfig(); @@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _codecFactory = codecFactory; + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return (AMQProtocolSession) minaProtocolSession.getAttachment(); } + public long getSessionID() + { + return _sessionID; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } + CurrentActor.set(_actor); try { body.handle(channelId, this); @@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable closeChannel(channelId); throw e; } - + finally + { + CurrentActor.remove(); + } } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _virtualHost = virtualHost; + _actor.virtualHostSelected(this); + _virtualHost.getConnectionRegistry().registerConnection(this); _managedObject = createMBean(); @@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void setAuthorizedID(Principal authorizedID) { _authorizedID = authorizedID; + + // Let the actor know that this connection is now Authorized + _actor.connectionAuthorized(this); } public Principal getPrincipal() @@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _authorizedID; } + public SocketAddress getRemoteAddress() + { + return _minaProtocolSession.getRemoteAddress(); + } + public MethodRegistry getMethodRegistry() { return MethodRegistry.getMethodRegistry(getProtocolVersion()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1e8dd9f77a..b15e6a4219 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -37,6 +37,7 @@ import java.security.Principal; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder { + long getSessionID(); public static final class ProtocolSessionIdentifier { @@ -196,6 +197,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin void setAuthorizedID(Principal authorizedID); + public java.net.SocketAddress getRemoteAddress(); + public MethodRegistry getMethodRegistry(); public MethodDispatcher getMethodDispatcher(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 1fce03b1c3..8c24acccbf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -274,6 +274,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException { + exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { @@ -281,6 +282,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _bindings.addBinding(routingKey, arguments, exchange); +// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments); + + //fixme MR logging in progress +// _bindings.addBinding(binding); +// +// if (_logger.isMessageEnabled(binding)) +// { +// _logger.message(binding, "QM-1001 : Created Binding"); +// } } public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22b4623ae1..b58b849133 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLogger; /** * An abstract application registry that provides access to configuration information and handles the @@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected PluginManager _pluginManager; + protected RootMessageLogger _rootMessageLogger; + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _pluginManager; } + public RootMessageLogger getRootMessageLogger() + { + return _rootMessageLogger; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 39164883f9..31a85b878a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { @@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, + new Log4jMessageLogger()); + initialiseManagedObjectRegistry(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); _pluginManager = new PluginManager(_configuration.getPluginDirectory()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index bbfda3addc..7d17639f22 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.mina.common.IoAcceptor; public interface IApplicationRegistry @@ -69,6 +70,8 @@ public interface IApplicationRegistry PluginManager getPluginManager(); + RootMessageLogger getRootMessageLogger(); + /** * Register any acceptors for this registry * @param bindAddress The address that the acceptor has been bound with diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 87f4fd5c7c..2053082fb2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -50,6 +50,8 @@ public interface Subscription AMQShortString getConsumerTag(); + long getSubscriptionID(); + boolean isSuspended(); boolean hasInterest(QueueEntry msg); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 68581acc14..26a3e9bebc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -69,6 +70,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final Lock _stateChangeLock; + private static final AtomicLong idGenerator = new AtomicLong(0); + // Create a simple ID that increments for ever new Subscription + private final long _subscriptionID = idGenerator.getAndIncrement(); + + static final class BrowserSubscription extends SubscriptionImpl { public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, @@ -535,6 +541,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _consumerTag; } + public long getSubscriptionID() + { + return _subscriptionID; + } + public AMQProtocolSession getProtocolSession() { return _channel.getProtocolSession(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index eda2d3a94e..9ef1e029d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.util; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Properties; - -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + public class NullApplicationRegistry extends ApplicationRegistry { public NullApplicationRegistry() throws ConfigurationException @@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { _logger.info("Initialising NullApplicationRegistry"); - + + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); + _configuration.setHousekeepingExpiredMessageCheckPeriod(200); - + Properties users = new Properties(); users.put("guest", "guest"); @@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); _managedObjectRegistry = new NoopManagedObjectRegistry(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); PropertiesConfiguration vhostProps = new PropertiesConfiguration(); VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); VirtualHost dummyHost = new VirtualHost(hostConfig); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 27917fac8a..5543adbeb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -20,6 +20,9 @@ */
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -32,6 +35,12 @@ public class VirtualHostRegistry private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
public synchronized void registerVirtualHost(VirtualHost host) throws Exception
{
@@ -67,4 +76,9 @@ public class VirtualHostRegistry {
return new ArrayList<VirtualHost>(_registry.values());
}
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java new file mode 100644 index 0000000000..012a590687 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; + +import java.util.List; + +public class RootMessageLoggerImplTest extends TestCase +{ + + RootMessageLogger _rootLogger; + UnitTestMessageLogger _rawLogger; + + public void setUp() throws ConfigurationException + { + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + + _rootLogger = new RootMessageLoggerImpl(serverConfig, _rawLogger); + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + } + + public void testLog() + { + String message = "test logging"; + + _rootLogger.rawMessage(message); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 1, logs.size()); + + assertTrue(logs.get(0).toString().contains(message)); + } + + public void testLogWithThrowable() + { + String message = "test logging"; + Exception exception = new Exception("Test"); + + _rootLogger.rawMessage(message, exception); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 2, logs.size()); + + String loggedMessage = (String) logs.get(0); + assertTrue("Message not found in log:" + loggedMessage, + loggedMessage.contains(message)); + + Exception fromLog = (Exception) logs.get(1); + assertEquals(exception.getMessage(), fromLog.getMessage()); + } + + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java new file mode 100644 index 0000000000..9a3c18bf99 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +import junit.framework.TestCase; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; + +/** + * Set of test to validate the effects of the changes made to the + * ServerConfiguration to enable the enabling/disabling of status update + * messages. + * + * The default is to on. + */ +public class StatusUpdateConfigurationTest extends TestCase +{ + + /** + * Validate that with no configuration the status updates will default to + * enabled. + * @throws org.apache.commons.configuration.ConfigurationException + * - if there was a problem in creating the configuratino + */ + public void testEnabled() throws ConfigurationException + { + + ServerConfiguration serverConfig = new ServerConfiguration( + new PropertiesConfiguration()); + + assertTrue("Status Updates not enabled as expected.", + serverConfig.getStatusUpdates()); + } + + + /** + * Validate that through the config it is possible to disable status updates + * @throws org.apache.commons.configuration.ConfigurationException + * - if there was a problem in creating the configuratino + */ + public void testUpdateControls() throws ConfigurationException + { + + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + config.setProperty("status-updates", "off"); + + + assertFalse("Status Updates should not be enabled.", + serverConfig.getStatusUpdates()); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java new file mode 100644 index 0000000000..298e3bc22c --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java @@ -0,0 +1,206 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.AMQChannel; + +import java.util.List; + +/** + * Test : AMQPConnectionActorTest + * Validate the AMQPConnectionActor class. + * + * The test creates a new AMQPActor and then logs a message using it. + * + * The test then verifies that the logged message was the only one created and + * that the message contains the required message. + */ +public class AMQPChannelActorTest extends TestCase +{ + + LogActor _amqpActor; + UnitTestMessageLogger _rawLogger; + + public void setUp() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + + AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore()); + + _amqpActor = new AMQPChannelActor(channel, rootLogger); + + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + } + + /** + * Test that when logging on behalf of the channel + * The test sends a message then verifies that it entered the logs. + * + * The log message should be fully repalaced (no '{n}' values) and should + * contain the channel id ('/ch:1') identification. + */ + public void testChannel() + { + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 1, logs.size()); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message:" + logs.get(0), + logs.get(0).toString().contains(message)); + + // Verify that the message has the correct type + assertTrue("Message contains the [con: prefix", + logs.get(0).toString().contains("[con:")); + + + // Verify that all the values were presented to the MessageFormatter + // so we will not end up with '{n}' entries in the log. + assertFalse("Verify that the string does not contain any '{'." + logs.get(0), + logs.get(0).toString().contains("{")); + + // Verify that the logged message contains the 'ch:1' marker + assertTrue("Message was not logged as part of channel 1" + logs.get(0), + logs.get(0).toString().contains("/ch:1")); + + } + + public void testChannelLoggingOff() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "OFF"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + + AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore()); + + _amqpActor = new AMQPChannelActor(channel, rootLogger); + + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java new file mode 100644 index 0000000000..c220865864 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java @@ -0,0 +1,202 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogActor; + +import java.util.List; + +/** + * Test : AMQPConnectionActorTest + * Validate the AMQPConnectionActor class. + * + * The test creates a new AMQPActor and then logs a message using it. + * + * The test then verifies that the logged message was the only one created and + * that the message contains the required message. + */ +public class AMQPConnectionActorTest extends TestCase +{ + + LogActor _amqpActor; + UnitTestMessageLogger _rawLogger; + + public void setUp() throws ConfigurationException + { + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + _amqpActor = new AMQPConnectionActor(session, rootLogger); + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + } + + /** + * Test the AMQPActor logging as a Connection level. + * + * The test sends a message then verifies that it entered the logs. + * + * The log message should be fully repalaced (no '{n}' values) and should + * not contain any channel identification. + * + */ + public void testConnection() + { + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 1, logs.size()); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message", + logs.get(0).toString().contains(message)); + + // Verify that the message has the correct type + assertTrue("Message contains the [con: prefix", + logs.get(0).toString().contains("[con:")); + + // Verify that all the values were presented to the MessageFormatter + // so we will not end up with '{n}' entries in the log. + assertFalse("Verify that the string does not contain any '{'.", + logs.get(0).toString().contains("{")); + + // Verify that the logged message does not contains the 'ch:' marker + assertFalse("Message was logged with a channel identifier." + logs.get(0), + logs.get(0).toString().contains("/ch:")); + } + + + + public void testConnectionLoggingOff() throws ConfigurationException, AMQException + { + Configuration config = new PropertiesConfiguration(); + config.addProperty("status-updates", "OFF"); + + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + + _amqpActor = new AMQPConnectionActor(session, rootLogger); + + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 0, logs.size()); + + } + + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java new file mode 100644 index 0000000000..c1cc3253a8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; + +/** + * Test : CurrentActorTest + * Summary: + * Validate ThreadLocal operation. + * + * Test creates THREADS number of threads which all then execute the same test + * together ( as close as looping Thread.start() will allow). + * + * Test: + * Test sets the CurrentActor then proceeds to retrieve the value and use it. + * + * The test also validates that it is the same LogActor that this thread set. + * + * Finally the LogActor is removed and tested to make sure that it was + * successfully removed. + * + * By having a higher number of threads than would normally be used in the + * Poolling filter we aim to catch the race condition where a ThreadLocal remove + * is called before one or more threads call get(). This way we can ensure that + * the remove does not affect more than the Thread it was called in. + */ +public class CurrentActorTest extends TestCase +{ + //Set this to be a reasonably large number + int THREADS = 10; + + // Record any exceptions that are thrown by the threads + final Exception[] _errors = new Exception[THREADS]; + + // Create a single session for this test. + AMQProtocolSession session; + + public void setUp() + { + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + } + + public void testFIFO() throws AMQException + { + // Create a new actor using retrieving the rootMessageLogger from + // the default ApplicationRegistry. + //fixme reminder that we need a better approach for broker testing. + AMQPConnectionActor connectionActor = new AMQPConnectionActor(session, + ApplicationRegistry.getInstance(). + getRootMessageLogger()); + + CurrentActor.set(connectionActor); + + //Use the Actor to send a simple message + CurrentActor.get().message(new LogSubject() + { + public String toString() + { + return "[CurrentActorTest] "; + } + + }, new LogMessage() + { + public String toString() + { + return "Connection Log Msg"; + } + }); + + // Verify it was the same actor as we set earlier + assertEquals("Retrieved actor is not as expected ", + connectionActor, CurrentActor.get()); + + /** + * Set the actor to nwo be the Channel actor so testing the ability + * to push the actor on to the stack + */ + + AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore()); + + AMQPChannelActor channelActor = new AMQPChannelActor(channel, + ApplicationRegistry.getInstance(). + getRootMessageLogger()); + + CurrentActor.set(channelActor); + + //Use the Actor to send a simple message + CurrentActor.get().message(new LogSubject() + { + public String toString() + { + return "[CurrentActorTest] "; + } + + }, new LogMessage() + { + public String toString() + { + return "Channel Log Msg"; + } + }); + + // Verify it was the same actor as we set earlier + assertEquals("Retrieved actor is not as expected ", + channelActor, CurrentActor.get()); + + // Remove the ChannelActor from the stack + CurrentActor.remove(); + + // Verify we now have the same connection actor as we set earlier + assertEquals("Retrieved actor is not as expected ", + connectionActor, CurrentActor.get()); + + // Verify that removing the last actor returns us to a null value. + CurrentActor.remove(); + + assertNull("CurrentActor should be null", CurrentActor.get()); + + } + + public void testThreadLocal() + { + + // Setup the threads + Thread[] threads = new Thread[THREADS]; + for (int count = 0; count < THREADS; count++) + { + Runnable test = new Test(count); + threads[count] = new Thread(test); + } + + //Run the threads + for (int count = 0; count < THREADS; count++) + { + threads[count].start(); + } + + // Wait for them to finish + for (int count = 0; count < THREADS; count++) + { + try + { + threads[count].join(); + } + catch (InterruptedException e) + { + //if we are interrupted then we will exit shortly. + } + } + + // Verify that none of the tests threw an exception + for (int count = 0; count < THREADS; count++) + { + if (_errors[count] != null) + { + _errors[count].printStackTrace(); + fail("Error occured in thread:" + count); + } + } + } + + public class Test implements Runnable + { + int count; + + Test(int count) + { + this.count = count; + } + + public void run() + { + + // Create a new actor using retrieving the rootMessageLogger from + // the default ApplicationRegistry. + //fixme reminder that we need a better approach for broker testing. + AMQPConnectionActor actor = new AMQPConnectionActor(session, + ApplicationRegistry.getInstance(). + getRootMessageLogger()); + + CurrentActor.set(actor); + + try + { + //Use the Actor to send a simple message + CurrentActor.get().message(new LogSubject() + { + public String toString() + { + return "[CurrentActorTest] "; + } + + }, new LogMessage() + { + public String toString() + { + return "Running Thread:" + count; + } + }); + + // Verify it was the same actor as we set earlier + assertEquals("Retrieved actor is not as expected ", + actor, CurrentActor.get()); + + // Verify that removing the actor works for this thread + CurrentActor.remove(); + + assertNull("CurrentActor should be null", CurrentActor.get()); + } + catch (Exception e) + { + _errors[count] = e; + } + + } + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java new file mode 100644 index 0000000000..fa0bb6529e --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; + +import java.security.Principal; +import java.util.List; + +/** + * Test : AMQPConnectionActorTest + * Validate the AMQPConnectionActor class. + * + * The test creates a new AMQPActor and then logs a message using it. + * + * The test then verifies that the logged message was the only one created and + * that the message contains the required message. + */ +public class ManagementActorTest extends TestCase +{ + + LogActor _amqpActor; + UnitTestMessageLogger _rawLogger; + + public void setUp() throws ConfigurationException + { + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + _amqpActor = new ManagementActor(new Principal() + { + public String getName() + { + return "ManagementActorTest"; + } + }, rootLogger); + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + } + + /** + * Test the AMQPActor logging as a Connection level. + * + * The test sends a message then verifies that it entered the logs. + * + * The log message should be fully repalaced (no '{n}' values) and should + * not contain any channel identification. + */ + public void testConnection() + { + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 1, logs.size()); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message", + logs.get(0).toString().contains(message)); + + // Verify that all the values were presented to the MessageFormatter + // so we will not end up with '{n}' entries in the log. + assertFalse("Verify that the string does not contain any '{'.", + logs.get(0).toString().contains("{")); + + // Verify that the message has the correct type + assertTrue("Message contains the [mng: prefix", + logs.get(0).toString().contains("[mng:")); + + // Verify that the logged message does not contains the 'ch:' marker + assertFalse("Message was logged with a channel identifier." + logs.get(0), + logs.get(0).toString().contains("/ch:")); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java new file mode 100644 index 0000000000..ec84d8bc9b --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; + +public class TestBlankActor extends AbstractActor +{ + public TestBlankActor(RootMessageLogger rootLogger) + { + super(rootLogger); + _logString = "[Blank]"; + } +} +
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java new file mode 100644 index 0000000000..d7a5aa667b --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.rawloggers; + +import junit.framework.TestCase; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** Test that the Log4jMessageLogger defaults behave as expected */ +public class Log4jMessageLoggerTest extends TestCase +{ + private File _lodgfile; + + Level _rootLevel; + Log4jTestAppender _appender; + + @Override + public void setUp() throws IOException + { + // Setup a file for logging + _appender = new Log4jTestAppender(); + + Logger root = Logger.getRootLogger(); + root.addAppender(_appender); + + _rootLevel = Logger.getRootLogger().getLevel(); + if (_rootLevel != Level.INFO) + { + root.setLevel(Level.INFO); + root.warn("Root Logger set to:" + _rootLevel + " Resetting to INFO for test."); + } + root.warn("Adding Test Appender:" + _appender); + } + + @Override + public void tearDown() + { + Logger root = Logger.getRootLogger(); + root.warn("Removing Test Appender:" + _appender); + root.warn("Resetting Root Level to : " + _rootLevel); + + Logger.getRootLogger().setLevel(_rootLevel); + + Logger.getRootLogger().removeAppender(_appender); + + //Call close on our appender. This will clear the log messages + // from Memory + _appender.close(); + } + + /** + * Verify that the default configuraion of Log4jMessageLogger will + * log a message. + * + * @throws IOException + * @throws InterruptedException + */ + public void testDefaultLogsMessage() throws IOException, InterruptedException + { + // Create a logger to test + Log4jMessageLogger logger = new Log4jMessageLogger(); + + //Create Message for test + String message = "testDefaults"; + + // Log the message + logger.rawMessage(message); + + verifyLogPresent(message); + } + + /** + * This test checks that if the Root Logger level is set such that the INFO + * messages would not be logged then the Log4jMessageLogger default of INFO + * will result in logging not being presented. + * + * @throws IOException + * @throws InterruptedException + */ + public void testDefaultsLogsAtInfo() throws IOException, InterruptedException + { + // Create a logger to test + Log4jMessageLogger logger = new Log4jMessageLogger(); + + //Create Message for test + String message = "testDefaults"; + + //Set default logger level to off + Logger.getRootLogger().setLevel(Level.WARN); + + // Log the message + logger.rawMessage(message); + + verifyNoLog(message); + } + + /** + * Test that changing the logger works. + * <p/> + * Test this by setting the default logger level to off which has been + * verified to work by test 'testDefaultsLevelObeyed' + * + * @throws IOException + * @throws InterruptedException + */ + public void testDefaultLoggerAdjustment() throws IOException, InterruptedException + { + String loggerName = "TestLogger"; + // Create a logger to test + Log4jMessageLogger logger = new Log4jMessageLogger(Log4jMessageLogger.DEFAULT_LEVEL, loggerName); + + //Create Message for test + String message = "testDefaults"; + + //Disable the default Log4jMessageLogger logger + Level originalLevel = Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).getLevel(); + Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(Level.OFF); + + // Log the message + logger.rawMessage(message); + + verifyLogPresent(message); + + // Restore the logging level + Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(originalLevel); + } + + /** + * Test that changing the log level has an effect. + * Set the level to be debug + * but only set the logger to log at INFO + * there should be no data printed + * subsequently changing the root logger to allow DEBUG should + * show the message + * + * @throws IOException + * @throws InterruptedException + */ + public void testDefaultsLevelObeyed() throws IOException, InterruptedException + { + // Create a logger to test + Log4jMessageLogger logger = new Log4jMessageLogger("DEBUG", Log4jMessageLogger.DEFAULT_LOGGER); + + //Create Message for test + String message = "testDefaults"; + + //Set root logger to INFO only + Logger.getRootLogger().setLevel(Level.INFO); + + // Log the message + logger.rawMessage(message); + + verifyNoLog(message); + + //Set root logger to INFO only + Logger.getRootLogger().setLevel(Level.DEBUG); + + // Log the message + logger.rawMessage(message); + + verifyLogPresent(message); + } + + /** + * Check that the Log Message reached log4j + * @param message the message to search for + */ + private void verifyLogPresent(String message) + { + List<String> results = findMessageInLog(message); + + //Validate we only got one message + assertEquals("The result set was not as expected.", 1, results.size()); + + // Validate message + String line = results.get(0); + + assertNotNull("No Message retrieved from log file", line); + assertTrue("Message not contained in log.:" + line, + line.contains(message)); + } + + /** + * Check that the given Message is not present in the log4j records. + * @param message the message to search for + */ + private void verifyNoLog(String message) + { + List<String> results = findMessageInLog(message); + + //Validate we only got one message + if (results.size() > 0) + { + System.err.println("Unexpected Log messages"); + + for (String msg : results) + { + System.err.println(msg); + } + } + + assertEquals("No messages expected.", 0, results.size()); + } + + /** + * Get the appenders list of events and return a list of all the messages + * that contain the given message + * + * @param message the search string + * @return The list of all logged messages that contain the search string. + */ + private List<String> findMessageInLog(String message) + { + List<LoggingEvent> log = _appender.getLog(); + + // Search Results for requested message + List<String> result = new LinkedList<String>(); + + for (LoggingEvent event : log) + { + if (String.valueOf(event.getMessage()).contains(message)) + { + result.add(String.valueOf(event.getMessage())); + } + } + + return result; + } + + + /** + * Log4j Appender that simply records all the Logging Events so we can + * verify that the above logging will make it to log4j in a unit test. + */ + private class Log4jTestAppender extends AppenderSkeleton + { + List<LoggingEvent> _log = new LinkedList<LoggingEvent>(); + + protected void append(LoggingEvent loggingEvent) + { + _log.add(loggingEvent); + } + + public void close() + { + _log.clear(); + } + + /** + * @return the list of LoggingEvents that have occured in this Appender + */ + public List<LoggingEvent> getLog() + { + return _log; + } + + public boolean requiresLayout() + { + return false; + } + } +} + diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java new file mode 100644 index 0000000000..df50cfb57a --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.rawloggers; + +import org.apache.qpid.server.logging.RawMessageLogger; + +import java.util.List; +import java.util.LinkedList; + +public class UnitTestMessageLogger implements RawMessageLogger +{ + List<Object> _log; + + public UnitTestMessageLogger() + { + _log = new LinkedList<Object>(); + } + + + public void rawMessage(String message) + { + _log.add(message); + } + + public void rawMessage(String message, Throwable throwable) + { + _log.add(message); + _log.add(throwable); + } + + + public List<Object> getLogMessages() + { + return _log; + } + + public void clearLogMessages() + { + _log.clear(); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java new file mode 100644 index 0000000000..e10de48432 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.rawloggers; + +import junit.framework.TestCase; + +import java.util.List; + +/** + * Test: UnitTestMessageLoggerTest + * + * This test verifies that UnitTestMessageLogger adhears to its interface. + * + * Messages are logged, and Throwables recorded in an array that can be + * retreived and cleared. + * + */ +public class UnitTestMessageLoggerTest extends TestCase +{ + private static final String TEST_MESSAGE = "Test"; + private static final String TEST_THROWABLE = "Test Throwable"; + + public void testRawMessage() + { + UnitTestMessageLogger logger = new UnitTestMessageLogger(); + + assertEquals("Messages logged before test start", 0, + logger.getLogMessages().size()); + + // Log a message + logger.rawMessage(TEST_MESSAGE); + + List<Object> messages = logger.getLogMessages(); + + assertEquals("Expected to have 1 messages logged", 1, messages.size()); + + assertEquals("First message not what was logged", + TEST_MESSAGE, messages.get(0)); + } + + public void testRawMessageWithThrowable() + { + UnitTestMessageLogger logger = new UnitTestMessageLogger(); + + assertEquals("Messages logged before test start", 0, + logger.getLogMessages().size()); + + // Log a message + Throwable throwable = new Throwable(TEST_THROWABLE); + + logger.rawMessage(TEST_MESSAGE, throwable); + + List<Object> messages = logger.getLogMessages(); + + assertEquals("Expected to have 2 entries", 2, messages.size()); + + assertEquals("Message text not what was logged", + TEST_MESSAGE, messages.get(0)); + + assertEquals("Message throwable not what was logged", + TEST_THROWABLE, ((Throwable) messages.get(1)).getMessage()); + + } + + public void testClear() + { + UnitTestMessageLogger logger = new UnitTestMessageLogger(); + + assertEquals("Messages logged before test start", 0, + logger.getLogMessages().size()); + + // Log a message + logger.rawMessage(TEST_MESSAGE); + + assertEquals("Expected to have 1 messages logged", + 1, logger.getLogMessages().size()); + + logger.clearLogMessages(); + + assertEquals("Expected to have no messagse after a clear", + 0, logger.getLogMessages().size()); + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java new file mode 100644 index 0000000000..04081db8e3 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.actors.TestBlankActor; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.List; + +public abstract class AbstractTestLogSubject extends TestCase +{ + protected Configuration _config = new PropertiesConfiguration(); + protected LogSubject _subject = null; + + protected List<Object> performLog() throws ConfigurationException + { + if (_subject == null) + { + throw new NullPointerException("LogSubject has not been set"); + } + + ServerConfiguration serverConfig = new ServerConfiguration(_config); + + UnitTestMessageLogger logger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, logger); + + LogActor actor_actor = new TestBlankActor(rootLogger); + + actor_actor.message(_subject, new LogMessage() + { + public String toString() + { + return "<Log Message>"; + } + }); + + return logger.getLogMessages(); + } + + /** + * Verify that the connection section has the expected items + * + * @param connectionID - The connection id (int) to check for + * @param user - the Connected username + * @param ipString - the ipString/hostname + * @param vhost - the virtualhost that the user connected to. + * @param message - the message these values should appear in. + */ + protected void verifyConnection(long connectionID, String user, String ipString, String vhost, String message) + { + // This should return us MockProtocolSessionUser@null/test + String connectionSlice = getSlice("con:" + connectionID, message); + + assertNotNull("Unable to find connection 'con:" + connectionID + "'", + connectionSlice); + + // Exract the userName + String[] userNameParts = connectionSlice.split("@"); + + assertEquals("Unable to split Username from rest of Connection:" + + connectionSlice, 2, userNameParts.length); + + assertEquals("Username not as expected", userNameParts[0], user); + + // Extract IP. + String[] ipParts = userNameParts[1].split("/"); + + assertEquals("Unable to split IP from rest of Connection:" + + userNameParts[1], 2, ipParts.length); + + assertEquals("IP not as expected", ipParts[0], ipString); + + //Finally check vhost + assertEquals("Virtualhost name not as expected.", vhost, ipParts[1]); + } + + /** + * Verify that the RoutingKey is present in the provided message. + * + * @param message The message to check + * @param routingKey The routing key to check against + */ + protected void verifyRoutingKey(String message, AMQShortString routingKey) + { + String routingKeySlice = getSlice("rk", message); + + assertNotNull("Routing Key not found:" + message, routingKey); + + assertEquals("Routing key not correct", + routingKey.toString(), routingKeySlice); + } + + /** + * Verify that the given Queue's name exists in the provided message + * + * @param message The message to check + * @param queue The queue to check against + */ + protected void verifyQueue(String message, AMQQueue queue) + { + String queueSlice = getSlice("qu", message); + + assertNotNull("Queue not found:" + message, queueSlice); + + assertEquals("Queue name not correct", + queue.getName().toString(), queueSlice); + } + + /** + * Verify that the given exchange (name and type) are present in the + * provided message. + * + * @param message The message to check + * @param exchange the exchange to check against + */ + protected void verifyExchange(String message, Exchange exchange) + { + String exchangeSilce = getSlice("ex", message); + + assertNotNull("Exchange not found:" + message, exchangeSilce); + + String[] exchangeParts = exchangeSilce.split("/"); + + assertEquals("Exchange should be in two parts ex(type/name)", 2, + exchangeParts.length); + + assertEquals("Exchange type not correct", + exchange.getType().toString(), exchangeParts[0]); + + assertEquals("Exchange name not correct", + exchange.getName().toString(), exchangeParts[1]); + + } + + /** + * Verify that a VirtualHost with the given name appears in the given + * message. + * + * @param message the message to search + * @param vhost the vhostName to check against + */ + protected void verifyVirtualHost(String message, VirtualHost vhost) + { + String vhostSlice = getSlice("vh", message); + + assertNotNull("Virtualhost not found:" + message, vhostSlice); + + assertEquals("Virtualhost not correct", "/" + vhost.getName(), vhostSlice); + } + + /** + * Parse the log message and return the slice according to the following: + * Given Example: + * con:1(guest@127.0.0.1/test)/ch:2/ex(amq.direct)/qu(myQueue)/rk(myQueue) + * + * Each item (except channel) is of the format <key>(<values>) + * + * So Given an ID to slice on: + * con:1 - Connection 1 + * ex - exchange + * qu - queue + * rk - routing key + * + * @param sliceID the slice to locate + * @param message the message to search in + * + * @return the slice if found otherwise null is returned + */ + protected String getSlice(String sliceID, String message) + { + int indexOfSlice = message.indexOf(sliceID + "("); + + if (indexOfSlice == -1) + { + return null; + } + + int endIndex = message.indexOf(')', indexOfSlice); + + if (endIndex == -1) + { + return null; + } + + return message.substring(indexOfSlice + 1 + sliceID.length(), + endIndex); + } + + /** + * Test that when Logging occurs a single log statement is provided + * + * @throws ConfigurationException + */ + public void testEnabled() throws ConfigurationException + { + List<Object> logs = performLog(); + + assertEquals("Log has to many messagse", 1, logs.size()); + + validateLogStatement(String.valueOf(logs.get(0))); + } + + /** + * Call to the individiual tests to validate the message is formatted as + * expected + * + * @param message the message whos format needs validation + */ + protected abstract void validateLogStatement(String message); + + /** + * Ensure that when status-updates are off this does not perform logging + * + * @throws ConfigurationException + */ + public void testDisabled() throws ConfigurationException + { + _config.addProperty("status-updates", "OFF"); + + List<Object> logs = performLog(); + + assertEquals("Log has to many messagse", 0, logs.size()); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java new file mode 100644 index 0000000000..845d02267f --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class BindingLogSubjectTest extends AbstractTestLogSubject +{ + + AMQQueue _queue; + AMQShortString _routingKey; + Exchange _exchange; + VirtualHost _testVhost; + + public void setUp() throws Exception + { + super.setUp(); + + _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). + getVirtualHost("test"); + // Configure items for subjectCreation + _routingKey = new AMQShortString("RoutingKey"); + _exchange = _testVhost.getExchangeRegistry().getDefaultExchange(); + _queue = new MockAMQQueue("BindingLogSubjectTest"); + ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + + _subject = new BindingLogSubject(_routingKey, _exchange, _queue); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][vh(/test)/ex(direct/<<default>>)/qu(BindingLogSubjectTest)/rk(RoutingKey)] <Log Message> + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + verifyVirtualHost(message, _testVhost); + verifyExchange(message, _exchange); + verifyQueue(message, _queue); + verifyRoutingKey(message, _routingKey); + } + + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java new file mode 100644 index 0000000000..9d5cb70f4b --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ChannelLogSubjectTest extends ConnectionLogSubjectTest +{ + private final int _channelID = 1; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + _session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + _session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore()); + + _subject = new ChannelLogSubject(channel); + } + + /** + * MESSAGE [Blank][con:0(MockProtocolSessionUser@null/test)/ch:1] <Log Message> + * + * @param message the message whos format needs validation + */ + protected void validateLogStatement(String message) + { + // Use the ConnectionLogSubjectTest to vaildate that the connection + // section is ok + super.validateLogStatement(message); + + // Finally check that the channel identifier is correctly added + assertTrue("Channel 1 identifier not found as part of Subject", + message.contains(")/ch:1]")); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java new file mode 100644 index 0000000000..ff2d9b5e11 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ConnectionLogSubjectTest extends AbstractTestLogSubject +{ + AMQProtocolSession _session; + + public void setUp() throws Exception + { + super.setUp(); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + _session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + _session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + _subject = new ConnectionLogSubject(_session); + } + + /** + * MESSAGE [Blank][con:0(MockProtocolSessionUser@null/test)] <Log Message> + * + * @param message the message whos format needs validation + */ + protected void validateLogStatement(String message) + { + verifyConnection(_session.getSessionID(), "MockProtocolSessionUser", "null", "test", message); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java new file mode 100644 index 0000000000..35df4c5976 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.framing.AMQShortString; + +public class ExchangeLogSubjectTest extends AbstractTestLogSubject +{ + Exchange _exchange; + VirtualHost _testVhost; + + public void setUp() throws Exception + { + super.setUp(); + + _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). + getVirtualHost("test"); + + _exchange = _testVhost.getExchangeRegistry().getDefaultExchange(); + _subject = new ExchangeLogSubject(_exchange,_testVhost); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][vh(/test)/ex(direct/<<default>>)] <Log Message> + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + verifyVirtualHost(message, _testVhost); + verifyExchange(message, _exchange); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java new file mode 100644 index 0000000000..7ef1f8d903 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class QueueLogSubjectTest extends AbstractTestLogSubject +{ + + AMQQueue _queue; + VirtualHost _testVhost; + + public void setUp() throws Exception + { + super.setUp(); + + _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). + getVirtualHost("test"); + + _queue = new MockAMQQueue("QueueLogSubjectTest"); + ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + + _subject = new QueueLogSubject(_queue); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][vh(/test)/qu(BindingLogSubjectTest)] <Log Message> + * + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + System.err.println(message); + verifyVirtualHost(message, _testVhost); + verifyQueue(message, _queue); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java new file mode 100644 index 0000000000..0b0b0d78d1 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.flow.LimitlessCreditManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.MockProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactory; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class SubscriptionLogSubjectTest extends AbstractTestLogSubject +{ + + AMQQueue _queue; + VirtualHost _testVhost; + private boolean _acks; + private FieldTable _filters; + private boolean _noLocal; + private int _channelID = 1; + Subscription _subscription; + + public void setUp() throws Exception + { + super.setUp(); + + _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). + getVirtualHost("test"); + + _queue = new MockAMQQueue("QueueLogSubjectTest"); + ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + + // Create a single session for this test. + // Re-use is ok as we are testing the LogActor object is set correctly, + // not the value of the output. + AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore()); + // Use the first Virtualhost that has been defined to initialise + // the MockProtocolSession. This prevents a NPE when the + // AMQPActor attempts to lookup the name of the VHost. + try + { + session.setVirtualHost(ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHosts(). + toArray(new VirtualHost[1])[0]); + } + catch (AMQException e) + { + fail("Unable to set virtualhost on session:" + e.getMessage()); + } + + AMQChannel channel = new AMQChannel(session, _channelID, session.getVirtualHost().getMessageStore()); + + session.addChannel(channel); + + SubscriptionFactory factory = new SubscriptionFactoryImpl(); + + _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"), + _acks, _filters, _noLocal, + new LimitlessCreditManager()); + + _subscription.setQueue(_queue); + + _subject = new SubscriptionLogSubject(_subscription); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][sub:0(qu(QueueLogSubjectTest))] <Log Message> + * + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + String subscriptionSlice = getSlice("sub:" + + _subscription.getSubscriptionID(), + message); + + assertNotNull("Unable to locate subscription 'sub:" + + _subscription.getSubscriptionID() + "'"); + + // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only + // Subject that nests () and so the simple parser of checking for the + // next ')' falls down. + verifyQueue(subscriptionSlice+")", _queue); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index d5db87350b..f09b03ab85 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -21,25 +21,22 @@ package org.apache.qpid.server.protocol; import junit.framework.TestCase; - import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import javax.management.JMException; +import java.security.Principal; -/** - * Test class to test MBean operations for AMQMinaProtocolSession. - */ +/** Test class to test MBean operations for AMQMinaProtocolSession. */ public class AMQProtocolSessionMBeanTest extends TestCase { /** Used for debugging. */ @@ -56,11 +53,11 @@ public class AMQProtocolSessionMBeanTest extends TestCase int channelCount = _mbean.channels().size(); assertTrue(channelCount == 1); AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()), - false, - new AMQShortString("test"), - true, - _protocolSession.getVirtualHost(), null); - AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); + false, + new AMQShortString("test"), + true, + _protocolSession.getVirtualHost(), null); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); @@ -114,8 +111,16 @@ public class AMQProtocolSessionMBeanTest extends TestCase IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _protocolSession = - new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), - null); + new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), + null); + // Need to authenticate session for it to work, (well for logging to work) + _protocolSession.setAuthorizedID(new Principal() + { + public String getName() + { + return "AMQProtocolSessionMBeanTestUser"; + } + }); _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); _channel = new AMQChannel(_protocolSession, 1, _messageStore); _protocolSession.addChannel(_channel); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index da35ddc594..49c5f8a14b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -34,6 +34,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.net.SocketAddress; public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java index 1bdabf345b..9597c1319a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -30,6 +30,8 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import java.security.Principal; + /** Test class to test MBean operations for AMQMinaProtocolSession. */ public class MaxChannelsTest extends TestCase { @@ -40,6 +42,16 @@ public class MaxChannelsTest extends TestCase { _session = new AMQMinaProtocolSession(new TestIoSession(), _appRegistry .getVirtualHostRegistry(), new AMQCodecFactory(true), null); + + // Need to authenticate session for it to work, (well for logging to work) + _session.setAuthorizedID(new Principal() + { + public String getName() + { + return "AMQProtocolSessionMBeanTestUser"; + } + }); + _session.setVirtualHost(_appRegistry.getVirtualHostRegistry().getVirtualHost("test")); // check the channel count is correct diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index d7bd297998..bfbb3e19e5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -46,6 +46,7 @@ public class MockAMQQueue implements AMQQueue { private boolean _deleted = false; private AMQShortString _name; + private VirtualHost _virtualhost; private PrincipalHolder _principalHolder; @@ -79,9 +80,14 @@ public class MockAMQQueue implements AMQQueue return null; //To change body of implemented methods use File | Settings | File Templates. } + public void setVirtualHost(VirtualHost virtualhost) + { + _virtualhost = virtualhost; + } + public VirtualHost getVirtualHost() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _virtualhost; } public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index ad6c95db8e..fe79c40bb9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -34,7 +34,9 @@ import org.apache.qpid.transport.Sender; import javax.security.sasl.SaslServer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.security.Principal; +import java.net.SocketAddress; /** * A protocol session that can be used for testing purposes. @@ -45,11 +47,21 @@ public class MockProtocolSession implements AMQProtocolSession private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + private static final AtomicLong idGenerator = new AtomicLong(0); + + private final long _sessionID = idGenerator.getAndIncrement(); + private VirtualHost _virtualHost; + public MockProtocolSession(MessageStore messageStore) { _messageStore = messageStore; } + public long getSessionID() + { + return _sessionID; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { } @@ -158,12 +170,12 @@ public class MockProtocolSession implements AMQProtocolSession public VirtualHost getVirtualHost() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _virtualHost; } public void setVirtualHost(VirtualHost virtualHost) { - //To change body of implemented methods use File | Settings | File Templates. + _virtualHost = virtualHost; } public void addSessionCloseTask(Task task) @@ -188,6 +200,18 @@ public class MockProtocolSession implements AMQProtocolSession public Principal getPrincipal() { + return new Principal() + { + public String getName() + { + return "MockProtocolSessionUser"; + } + }; + + } + + public SocketAddress getRemoteAddress() + { return null; //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index 83bcd03177..f56d562354 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; public class FirewallPluginTest extends TestCase { @@ -89,11 +90,13 @@ public class FirewallPluginTest extends TestCase public void setUp() throws Exception { _store = new TestableMemoryMessageStore(); - PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); TestIoSession iosession = new TestIoSession(); iosession.setAddress("127.0.0.1"); - VirtualHostRegistry virtualHostRegistry = null; + + // Retreive VirtualHost from the Registry + VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); + _virtualHost = virtualHostRegistry.getVirtualHost("test"); + AMQCodecFactory codecFactory = new AMQCodecFactory(true); _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index c7ea2067a6..94f7ff33a6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -21,10 +21,6 @@ package org.apache.qpid.server.subscription; * */ -import java.util.ArrayList; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; @@ -32,6 +28,11 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + public class MockSubscription implements Subscription { @@ -44,6 +45,10 @@ public class MockSubscription implements Subscription private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private static final AtomicLong idGenerator = new AtomicLong(0); + // Create a simple ID that increments for ever new Subscription + private final long _subscriptionID = idGenerator.getAndIncrement(); + public void close() { _closed = true; @@ -66,7 +71,12 @@ public class MockSubscription implements Subscription public AMQShortString getConsumerTag() { - return tag ; + return tag; + } + + public long getSubscriptionID() + { + return _subscriptionID; } public AMQQueue.Context getQueueContext() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 101c33043d..585ed9a538 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -21,30 +21,31 @@ package org.apache.qpid.server.util; import junit.framework.TestCase; - import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.ConsumerTagNotUniqueException; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.util.MockChannel; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; + +import java.security.Principal; public class InternalBrokerBaseCase extends TestCase { @@ -64,7 +65,7 @@ public class InternalBrokerBaseCase extends TestCase configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); _registry = new TestApplicationRegistry(new ServerConfiguration(configuration)); ApplicationRegistry.initialise(_registry); - _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); + _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); _messageStore = _virtualHost.getMessageStore(); @@ -80,6 +81,14 @@ public class InternalBrokerBaseCase extends TestCase _session = new InternalTestProtocolSession(); + _session.setAuthorizedID(new Principal() + { + public String getName() + { + return "InternalBrokerBaseCaseUser"; + } + }); + _session.setVirtualHost(_virtualHost); _channel = new MockChannel(_session, 1, _messageStore); @@ -176,7 +185,7 @@ public class InternalBrokerBaseCase extends TestCase for (int count = 0; count < messages; count++) { - channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange())); + channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange())); //Set the body size ContentHeaderBody _headerBody = new ContentHeaderBody(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index c6ecac6a01..84bee7984b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -87,7 +87,7 @@ public class TestApplicationRegistry extends ApplicationRegistry _messageStore = new TestableMemoryMessageStore(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); PropertiesConfiguration vhostProps = new PropertiesConfiguration(); VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); diff --git a/qpid/java/build.deps b/qpid/java/build.deps index fa5e3da4db..709f2a478e 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -169,6 +169,7 @@ tools.test.libs=${client.test.libs} testkit.test.libs=${test.libs} management-client.test.libs=${muse.libs} ${test.libs} ${log4j} ${javassist} ${geronimo-servlet} ${commons-pool} management-console.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs} +management-agent.test.libs=${junit} management-eclipse-plugin.test.libs=${systests.libs} broker-plugins.test.libs=${test.libs} management-tools-qpid-cli.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index 63222b50db..515c849290 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -20,7 +20,18 @@ */ package org.apache.qpid.util; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.LinkedList; +import java.util.List; /** * FileUtils provides some simple helper methods for working with files. It follows the convention of wrapping all @@ -46,7 +57,8 @@ public class FileUtils { BufferedInputStream is = null; - try{ + try + { try { is = new BufferedInputStream(new FileInputStream(filename)); @@ -57,7 +69,9 @@ public class FileUtils } return readStreamAsString(is); - }finally { + } + finally + { if (is != null) { try @@ -210,68 +224,69 @@ public class FileUtils /* * Deletes a given file */ - public static boolean deleteFile(String filePath) - { - return delete(new File(filePath), false); - } + public static boolean deleteFile(String filePath) + { + return delete(new File(filePath), false); + } /* * Deletes a given empty directory */ - public static boolean deleteDirectory(String directoryPath) - { - File directory = new File(directoryPath); - - if (directory.isDirectory()) - { - if (directory.listFiles().length == 0) - { - return delete(directory, true); - } - } - - return false; - } - - /** - * Delete a given file/directory, - * A directory will always require the recursive flag to be set. - * if a directory is specified and recursive set then delete the whole tree - * @param file the File object to start at - * @param recursive boolean to recurse if a directory is specified. - * @return <code>true</code> if and only if the file or directory is - * successfully deleted; <code>false</code> otherwise - */ - public static boolean delete(File file, boolean recursive) - { - boolean success = true; - - if (file.isDirectory()) - { - if (recursive) - { - File[] files = file.listFiles(); - - // This can occur if the file is deleted outside the JVM - if (files == null) - { - return false; - } - - for (int i = 0; i < files.length; i++) - { - success = delete(files[i], true) && success; - } - - return success && file.delete(); - } - - return false; - } - - return file.delete(); - } + public static boolean deleteDirectory(String directoryPath) + { + File directory = new File(directoryPath); + if (directory.isDirectory()) + { + if (directory.listFiles().length == 0) + { + return delete(directory, true); + } + } + + return false; + } + + /** + * Delete a given file/directory, + * A directory will always require the recursive flag to be set. + * if a directory is specified and recursive set then delete the whole tree + * + * @param file the File object to start at + * @param recursive boolean to recurse if a directory is specified. + * + * @return <code>true</code> if and only if the file or directory is + * successfully deleted; <code>false</code> otherwise + */ + public static boolean delete(File file, boolean recursive) + { + boolean success = true; + + if (file.isDirectory()) + { + if (recursive) + { + File[] files = file.listFiles(); + + // This can occur if the file is deleted outside the JVM + if (files == null) + { + return false; + } + + for (int i = 0; i < files.length; i++) + { + success = delete(files[i], true) && success; + } + + return success && file.delete(); + } + + return false; + } + + return file.delete(); + } public static class UnableToCopyException extends Exception { @@ -294,7 +309,6 @@ public class FileUtils throw new IllegalArgumentException("Unable to copy '" + source.toString() + "' to '" + dst + "' a file with same name exists."); } - if (source.isFile()) { copy(source, dst); @@ -303,22 +317,48 @@ public class FileUtils //else we have a source directory if (!dst.isDirectory() && !dst.mkdir()) { - throw new UnableToCopyException("Unable to create destination directory"); + throw new UnableToCopyException("Unable to create destination directory"); } - for (File file : source.listFiles()) { - if (file.isFile()) - { - copy(file, new File(dst.toString() + File.separator + file.getName())); - } - else - { - copyRecursive(file, new File(dst + File.separator + file.getName())); - } + if (file.isFile()) + { + copy(file, new File(dst.toString() + File.separator + file.getName())); + } + else + { + copyRecursive(file, new File(dst + File.separator + file.getName())); + } } + } + + /** + * Checks the specified file for instances of the search string. + * + * @param file the file to search + * @param search the search String + * + * @throws java.io.IOException + * @return the list of matching entries + */ + public static List<String> searchFile(File file, String search) + throws IOException + { + + List<String> results = new LinkedList<String>(); + + BufferedReader reader = new BufferedReader(new FileReader(file)); + while (reader.ready()) + { + String line = reader.readLine(); + if (line.contains(search)) + { + results.add(line); + } + } + return results; } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java b/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java index 94e7e20a86..7eba5f092e 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java @@ -22,11 +22,12 @@ package org.apache.qpid.util; import junit.framework.TestCase; -import java.io.File; -import java.io.IOException; import java.io.BufferedWriter; -import java.io.FileWriter; +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; public class FileUtilsTest extends TestCase { @@ -47,7 +48,7 @@ public class FileUtilsTest extends TestCase //Create initial file File test = createTestFile(fileName, TEST_DATA); - + try { //Check number of files before copy @@ -137,7 +138,6 @@ public class FileUtilsTest extends TestCase testSubDir.deleteOnExit(); testDir.deleteOnExit(); - //Perform Copy File copyDir = new File(testDir.toString() + COPY); try @@ -282,7 +282,7 @@ public class FileUtilsTest extends TestCase public void testDeleteNonExistentFile() { - File test = new File("FileUtilsTest-testDelete-"+System.currentTimeMillis()); + File test = new File("FileUtilsTest-testDelete-" + System.currentTimeMillis()); assertTrue("File exists", !test.exists()); assertFalse("File is a directory", test.isDirectory()); @@ -303,7 +303,6 @@ public class FileUtilsTest extends TestCase } } - /** * Given two lists of File arrays ensure they are the same length and all entries in Before are in After * @@ -543,4 +542,71 @@ public class FileUtilsTest extends TestCase } } + public static final String SEARCH_STRING = "testSearch"; + + /** + * Test searchFile(File file, String search) will find a match when it + * exists. + * + * @throws java.io.IOException if unable to perform test setup + */ + public void testSearchSucceed() throws IOException + { + File _logfile = File.createTempFile("FileUtilsTest-testSearchSucceed", ".out"); + + prepareFileForSearchTest(_logfile); + + List<String> results = FileUtils.searchFile(_logfile, SEARCH_STRING); + + assertNotNull("Null result set returned", results); + + assertEquals("Results do not contain expected count", 1, results.size()); + } + + /** + * Test searchFile(File file, String search) will not find a match when the + * test string does not exist. + * + * @throws java.io.IOException if unable to perform test setup + */ + public void testSearchFail() throws IOException + { + File _logfile = File.createTempFile("FileUtilsTest-testSearchFail", ".out"); + + prepareFileForSearchTest(_logfile); + + List<String> results = FileUtils.searchFile(_logfile, "Hello"); + + assertNotNull("Null result set returned", results); + + //Validate we only got one message + if (results.size() > 0) + { + System.err.println("Unexpected messages"); + + for (String msg : results) + { + System.err.println(msg); + } + } + + assertEquals("Results contains data when it was not expected", + 0, results.size()); + } + + /** + * Write the SEARCH_STRING in to the given file. + * + * @param logfile The file to write the SEARCH_STRING into + * + * @throws IOException if an error occurs + */ + private void prepareFileForSearchTest(File logfile) throws IOException + { + BufferedWriter writer = new BufferedWriter(new FileWriter(logfile)); + writer.append(SEARCH_STRING); + writer.flush(); + writer.close(); + } + } diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 5d0c0a9541..9bf0270bd6 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -373,7 +373,7 @@ <target name="libs-release" description="copy dependencies into module release"> <!-- Copy the module dependencies --> - <copylist todir="${module.release}" dir="${project.root}" files="${module.libs}"/> + <copylist todir="${module.release}/lib" dir="${project.root}" files="${module.libs}"/> <!-- Copy the jar for this module --> <copy todir="${module.release}/lib" failonerror="true"> <fileset file="${module.jar}"/> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 737ed2322f..2216758c2e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -152,6 +152,11 @@ public class SubscriptionTestHelper implements Subscription return null; //To change body of implemented methods use File | Settings | File Templates. } + public long getSubscriptionID() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isActive() { return false; //To change body of implemented methods use File | Settings | File Templates. diff --git a/qpid/python/mllib/__init__.py b/qpid/python/mllib/__init__.py index 39e9363614..9aa1e56e66 100644 --- a/qpid/python/mllib/__init__.py +++ b/qpid/python/mllib/__init__.py @@ -24,6 +24,8 @@ both SGML and XML. import os, dom, transforms, parsers, sys import xml.sax, types +from xml.sax.handler import ErrorHandler +from xml.sax.xmlreader import InputSource from cStringIO import StringIO def transform(node, *args): @@ -49,15 +51,33 @@ def sgml_parse(source): p.close() return p.parser.tree -def xml_parse(filename): +class Resolver: + + def __init__(self, path): + self.path = path + + def resolveEntity(self, publicId, systemId): + for p in self.path: + fname = os.path.join(p, systemId) + if os.path.exists(fname): + source = InputSource(systemId) + source.setByteStream(open(fname)) + return source + return InputSource(systemId) + +def xml_parse(filename, path=()): if sys.version_info[0:2] == (2,3): # XXX: this is for older versions of python - source = "file://%s" % os.path.abspath(filename) + source = "file://%s" % os.path.abspath(filename) else: source = filename - p = parsers.XMLParser() - xml.sax.parse(source, p) - return p.parser.tree + h = parsers.XMLParser() + p = xml.sax.make_parser() + p.setContentHandler(h) + p.setErrorHandler(ErrorHandler()) + p.setEntityResolver(Resolver(path)) + p.parse(source) + return h.parser.tree def sexp(node): s = transforms.Sexp() diff --git a/qpid/specs/amqp.0-10-qpid-errata.xml b/qpid/specs/amqp.0-10-qpid-errata.xml index 5a554b0ab7..365928ea4e 100644 --- a/qpid/specs/amqp.0-10-qpid-errata.xml +++ b/qpid/specs/amqp.0-10-qpid-errata.xml @@ -121,7 +121,7 @@ --> -<!--<!DOCTYPE amqp SYSTEM "amqp.0-10.dtd">--> +<!DOCTYPE amqp SYSTEM "amqp.0-10.dtd"> <amqp xmlns="http://www.amqp.org/schema/amqp.xsd" major="0" minor="10" port="5672"> |