/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.registry; import java.util.Collection; import java.util.Timer; import java.util.TimerTask; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.ConfigurationEntryStore; import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; import org.apache.qpid.server.configuration.RecovererProvider; import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. *
* Subclasses should handle the construction of the "registered objects" such as the exchange registry. */ public class ApplicationRegistry implements IApplicationRegistry { private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(); private volatile RootMessageLogger _rootMessageLogger; private Broker _broker; private Timer _reportingTimer; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private LogRecorder _logRecorder; private ConfigurationEntryStore _store; private TaskExecutor _taskExecutor; protected void setRootMessageLogger(RootMessageLogger rootMessageLogger) { _rootMessageLogger = rootMessageLogger; } public ApplicationRegistry(ConfigurationEntryStore store) { _store = store; initialiseStatistics(); } public void initialise() throws Exception { // Create the RootLogger to be used during broker operation boolean statusUpdatesEnabled = Boolean.parseBoolean(System.getProperty(BrokerProperties.PROPERTY_STATUS_UPDATES, "true")); _rootMessageLogger = new Log4jMessageLogger(statusUpdatesEnabled); _logRecorder = new LogRecorder(); //Create the composite (log4j+SystemOut MessageLogger to be used during startup RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger}; CompositeStartupMessageLogger startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers); BrokerActor actor = new BrokerActor(startupMessageLogger); CurrentActor.set(actor); CurrentActor.setDefault(actor); GenericActor.setDefaultMessageLogger(_rootMessageLogger); try { logStartupMessages(CurrentActor.get()); _taskExecutor = new TaskExecutor(); _taskExecutor.start(); RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor); ConfiguredObjectRecoverer extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry()); _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); initialiseStatisticsReporting(); // starting the broker _broker.setDesiredState(State.INITIALISING, State.ACTIVE); CurrentActor.get().message(BrokerMessages.READY()); } finally { CurrentActor.remove(); } CurrentActor.setDefault(new BrokerActor(_rootMessageLogger)); } private void initialiseStatisticsReporting() { long report = ((Number)_broker.getAttribute(Broker.STATISTICS_REPORTING_PERIOD)).intValue() * 1000; // convert to ms final boolean reset = (Boolean)_broker.getAttribute(Broker.STATISTICS_REPORTING_RESET_ENABLED); /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ if (report > 0L) { _reportingTimer = new Timer("Statistics-Reporting", true); StatisticsReportingTask task = new StatisticsReportingTask(reset, _rootMessageLogger); _reportingTimer.scheduleAtFixedRate(task, report / 2, report); } } private class StatisticsReportingTask extends TimerTask { private final int DELIVERED = 0; private final int RECEIVED = 1; private final boolean _reset; private final RootMessageLogger _logger; public StatisticsReportingTask(boolean reset, RootMessageLogger logger) { _reset = reset; _logger = logger; } public void run() { CurrentActor.set(new AbstractActor(_logger) { public String getLogMessage() { return "[" + Thread.currentThread().getName() + "] "; } }); try { CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); Collection