/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.registry; import java.util.Collection; import java.util.Timer; import java.util.TimerTask; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.ConfigurationEntryStore; import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; import org.apache.qpid.server.configuration.RecovererProvider; import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. *

* Subclasses should handle the construction of the "registered objects" such as the exchange registry. */ public class ApplicationRegistry implements IApplicationRegistry { private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(); private volatile RootMessageLogger _rootMessageLogger; private Broker _broker; private Timer _reportingTimer; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private LogRecorder _logRecorder; private ConfigurationEntryStore _store; private TaskExecutor _taskExecutor; protected void setRootMessageLogger(RootMessageLogger rootMessageLogger) { _rootMessageLogger = rootMessageLogger; } public ApplicationRegistry(ConfigurationEntryStore store) { _store = store; initialiseStatistics(); } public void initialise() throws Exception { // Create the RootLogger to be used during broker operation boolean statusUpdatesEnabled = Boolean.parseBoolean(System.getProperty(BrokerProperties.PROPERTY_STATUS_UPDATES, "true")); _rootMessageLogger = new Log4jMessageLogger(statusUpdatesEnabled); _logRecorder = new LogRecorder(); //Create the composite (log4j+SystemOut MessageLogger to be used during startup RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger}; CompositeStartupMessageLogger startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers); BrokerActor actor = new BrokerActor(startupMessageLogger); CurrentActor.set(actor); CurrentActor.setDefault(actor); GenericActor.setDefaultMessageLogger(_rootMessageLogger); try { logStartupMessages(CurrentActor.get()); _taskExecutor = new TaskExecutor(); _taskExecutor.start(); RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor); ConfiguredObjectRecoverer brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry()); _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); initialiseStatisticsReporting(); // starting the broker _broker.setDesiredState(State.INITIALISING, State.ACTIVE); CurrentActor.get().message(BrokerMessages.READY()); } finally { CurrentActor.remove(); } CurrentActor.setDefault(new BrokerActor(_rootMessageLogger)); } private void initialiseStatisticsReporting() { long report = ((Number)_broker.getAttribute(Broker.STATISTICS_REPORTING_PERIOD)).intValue() * 1000; // convert to ms final boolean reset = (Boolean)_broker.getAttribute(Broker.STATISTICS_REPORTING_RESET_ENABLED); /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ if (report > 0L) { _reportingTimer = new Timer("Statistics-Reporting", true); StatisticsReportingTask task = new StatisticsReportingTask(reset, _rootMessageLogger); _reportingTimer.scheduleAtFixedRate(task, report / 2, report); } } private class StatisticsReportingTask extends TimerTask { private final int DELIVERED = 0; private final int RECEIVED = 1; private final boolean _reset; private final RootMessageLogger _logger; public StatisticsReportingTask(boolean reset, RootMessageLogger logger) { _reset = reset; _logger = logger; } public void run() { CurrentActor.set(new AbstractActor(_logger) { public String getLogMessage() { return "[" + Thread.currentThread().getName() + "] "; } }); try { CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); Collection hosts = _virtualHostRegistry.getVirtualHosts(); if (hosts.size() > 1) { for (VirtualHost vhost : hosts) { String name = vhost.getName(); StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); } } if (_reset) { resetStatistics(); } } catch(Exception e) { ApplicationRegistry._logger.warn("Unexpected exception occured while reporting the statistics", e); } finally { CurrentActor.remove(); } } } /** * Close non-null Closeable items and log any errors * @param close */ private void close(Closeable close) { try { if (close != null) { close.close(); } } catch (Throwable e) { _logger.error("Error thrown whilst closing " + close.getClass().getSimpleName(), e); } } public void close() { if (_logger.isInfoEnabled()) { _logger.info("Shutting down ApplicationRegistry:" + this); } //Set the Actor for Broker Shutdown CurrentActor.set(new BrokerActor(_rootMessageLogger)); try { //Stop Statistics Reporting if (_reportingTimer != null) { _reportingTimer.cancel(); } if (_broker != null) { _broker.setDesiredState(_broker.getActualState(), State.STOPPED); } //Shutdown virtualhosts close(_virtualHostRegistry); if (_taskExecutor != null) { _taskExecutor.stop(); } CurrentActor.get().message(BrokerMessages.STOPPED()); _logRecorder.closeLogRecorder(); } finally { if (_taskExecutor != null) { _taskExecutor.stopImmediately(); } CurrentActor.remove(); } _store = null; _broker = null; } public void registerMessageDelivered(long messageSize) { _messagesDelivered.registerEvent(1L); _dataDelivered.registerEvent(messageSize); } public void registerMessageReceived(long messageSize, long timestamp) { _messagesReceived.registerEvent(1L, timestamp); _dataReceived.registerEvent(messageSize, timestamp); } public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; } public StatisticsCounter getDataReceiptStatistics() { return _dataReceived; } public StatisticsCounter getMessageDeliveryStatistics() { return _messagesDelivered; } public StatisticsCounter getDataDeliveryStatistics() { return _dataDelivered; } public void resetStatistics() { _messagesDelivered.reset(); _dataDelivered.reset(); _messagesReceived.reset(); _dataReceived.reset(); for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts()) { vhost.resetStatistics(); } } public void initialiseStatistics() { _messagesDelivered = new StatisticsCounter("messages-delivered"); _dataDelivered = new StatisticsCounter("bytes-delivered"); _messagesReceived = new StatisticsCounter("messages-received"); _dataReceived = new StatisticsCounter("bytes-received"); } private void logStartupMessages(LogActor logActor) { logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion())); logActor.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"), System.getProperty("java.runtime.version", System.getProperty("java.version")), System.getProperty("os.name"), System.getProperty("os.version"), System.getProperty("os.arch"))); logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory())); } @Override public Broker getBroker() { return _broker; } }