diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-04-12 19:46:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-04-12 19:46:03 +0000 |
commit | 258e354c633f5efd66d5e57550609b83b37cc330 (patch) | |
tree | d75ccdf4dacde868336ad4adf6189e71b1eb08a9 | |
parent | bbda9004d532dc5f5f18553edf43456161a186db (diff) | |
download | qpid-python-258e354c633f5efd66d5e57550609b83b37cc330.tar.gz |
QPID-3917 : Refactor changes to MessageStore to use listeners and not decorators
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1325467 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 726 insertions, 793 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index f0fd97dc85..3f394b79a7 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -33,10 +33,18 @@ import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.TransactionConfig; - +import java.io.File; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; - import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -52,6 +60,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHa import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; @@ -83,17 +92,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; -import java.io.File; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - public abstract class AbstractBDBMessageStore implements MessageStore { private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class); @@ -154,7 +152,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private final AtomicLong _messageId = new AtomicLong(0); - protected final StateManager _stateManager = new StateManager(); + protected final StateManager _stateManager; protected TransactionConfig _transactionConfig = new TransactionConfig(); @@ -165,20 +163,27 @@ public abstract class AbstractBDBMessageStore implements MessageStore private TransactionLogRecoveryHandler _tlogRecoveryHandler; private ConfigurationRecoveryHandler _configRecoveryHandler; + + private final EventManager _eventManager = new EventManager(); + private String _storeLocation; public AbstractBDBMessageStore() { + _stateManager = new StateManager(_eventManager); } public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.stateTransition(State.INITIAL, State.CONFIGURING); + _stateManager.attainState(State.CONFIGURING); _configRecoveryHandler = recoveryHandler; configure(name,storeConfiguration); + + + } public void configureMessageStore(String name, @@ -188,16 +193,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore { _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; + + _stateManager.attainState(State.CONFIGURED); } public void activate() throws Exception { - _stateManager.stateTransition(State.CONFIGURING, State.RECOVERING); + _stateManager.attainState(State.RECOVERING); recoverConfig(_configRecoveryHandler); recoverMessages(_messageRecoveryHandler); recoverQueueEntries(_tlogRecoveryHandler); - _stateManager.stateTransition(State.RECOVERING, State.ACTIVE); + + _stateManager.attainState(State.ACTIVE); } public org.apache.qpid.server.store.Transaction newTransaction() @@ -216,8 +224,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void configure(String name, Configuration storeConfig) throws Exception { - File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, - System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name)); + final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, + System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name); + + File environmentPath = new File(storeLocation); if (!environmentPath.exists()) { if (!environmentPath.mkdirs()) @@ -227,6 +237,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + _storeLocation = storeLocation; + configure(environmentPath, false); } @@ -359,7 +371,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void close() throws Exception { - if (_stateManager.isInState(State.ACTIVE)) + if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED)) { _stateManager.stateTransition(State.ACTIVE, State.CLOSING); @@ -1975,13 +1987,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void addEventListener(EventListener eventListener, Event event) + public void addEventListener(EventListener eventListener, Event... events) { - throw new UnsupportedOperationException(); + _eventManager.addEventListener(eventListener, events); } - public MessageStore getUnderlyingStore() + @Override + public String getStoreLocation() { - return this; + return _storeLocation; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 145d0f35e3..7e5ef3f94c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -19,19 +19,16 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreFactory; -import org.apache.qpid.server.store.decorators.EventDecorator; -import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; public class BDBMessageStoreFactory implements MessageStoreFactory { @Override - public MessageStore createMessageStore(LogSubject logSubject) + public MessageStore createMessageStore() { - return new OperationalLoggingDecorator(new EventDecorator(new BDBMessageStore()), logSubject); + return new BDBMessageStore(); } @Override diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a45b98b8b5..5b69c2ead6 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -369,10 +369,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } private BDBMessageStore assertBDBStore(MessageStore store) { - MessageStore underlyingStore = store.getUnderlyingStore(); - assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, underlyingStore.getClass()); - return (BDBMessageStore) underlyingStore; + assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); + + return (BDBMessageStore) store; } private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/java/broker/src/main/java/org/apache/qpid/server/store/Event.java index 95b0186027..bbde11ab4c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/Event.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/Event.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.store; public enum Event { + BEFORE_INIT, + AFTER_INIT, BEFORE_ACTIVATE, AFTER_ACTIVATE, BEFORE_PASSIVATE, diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java index 3e10f758f9..21ae3924b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java @@ -20,22 +20,29 @@ package org.apache.qpid.server.store; import java.util.ArrayList; +import java.util.EnumMap; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; public class EventManager { - private ConcurrentMap<Event, List<EventListener>> _listeners = new ConcurrentHashMap<Event, List<EventListener>>(); + private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class); - public void addEventListener(EventListener listener, Event event) + public synchronized void addEventListener(EventListener listener, Event... events) { - _listeners.putIfAbsent(event, new ArrayList<EventListener>()); - final List<EventListener> list = _listeners.get(event); - list.add(listener); + for(Event event : events) + { + List<EventListener> list = _listeners.get(event); + if(list == null) + { + list = new ArrayList<EventListener>(); + _listeners.put(event,list); + } + list.add(listener); + } } - public void notifyEvent(Event event) + public synchronized void notifyEvent(Event event) { if (_listeners.containsKey(event)) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index de9e73f914..59624b7a75 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; @@ -71,6 +72,34 @@ public class MemoryMessageStore extends NullMessageStore } }; + private final StateManager _stateManager; + private final EventManager _eventManager = new EventManager(); + + public MemoryMessageStore() + { + _stateManager = new StateManager(_eventManager); + } + + @Override + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception + { + _stateManager.attainState(State.CONFIGURING); + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + { + _stateManager.attainState(State.CONFIGURED); + } + + @Override + public void activate() throws Exception + { + _stateManager.attainState(State.RECOVERING); + + _stateManager.attainState(State.ACTIVE); + } + @Override public StoredMessage addMessage(StorableMessageMetaData metaData) { @@ -95,6 +124,14 @@ public class MemoryMessageStore extends NullMessageStore @Override public void close() throws Exception { + _stateManager.attainState(State.CLOSING); _closed.getAndSet(true); + _stateManager.attainState(State.CLOSED); + } + + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java index 0fb7b1f84f..8724f102c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -19,17 +19,13 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.store.decorators.EventDecorator; -import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; - public class MemoryMessageStoreFactory implements MessageStoreFactory { @Override - public MessageStore createMessageStore(LogSubject logSubject) + public MessageStore createMessageStore() { - return new OperationalLoggingDecorator(new EventDecorator(new MemoryMessageStore()), logSubject); + return new MemoryMessageStore(); } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index c76d4f223f..cf08ee00ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -64,8 +64,7 @@ public interface MessageStore extends DurableConfigurationStore */ void close() throws Exception; - void addEventListener(EventListener eventListener, Event event); - - MessageStore getUnderlyingStore(); + void addEventListener(EventListener eventListener, Event... events); + String getStoreLocation(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java index 878798eac3..a35db62b03 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java @@ -19,11 +19,9 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.logging.LogSubject; - public interface MessageStoreFactory { - MessageStore createMessageStore(LogSubject logSubject); + MessageStore createMessageStore(); String getStoreClassName(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 0b55f74730..77df6c5abf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -133,13 +133,14 @@ public class NullMessageStore implements MessageStore } @Override - public void addEventListener(EventListener eventListener, Event event) + public void addEventListener(EventListener eventListener, Event... events) { } @Override - public MessageStore getUnderlyingStore() + public String getStoreLocation() { - return this; + return null; } + }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java new file mode 100644 index 0000000000..caff17daa5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; + +public class OperationalLoggingListener implements EventListener +{ + protected final LogSubject _logSubject; + private MessageStore _store; + + private OperationalLoggingListener(final MessageStore store, LogSubject logSubject) + { + _logSubject = logSubject; + store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE); + _store = store; + } + + public void event(Event event) + { + switch(event) + { + case BEFORE_INIT: + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED()); + break; + case AFTER_INIT: + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED()); + CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED()); + String storeLocation = _store.getStoreLocation(); + if (storeLocation != null) + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation)); + } + break; + case BEFORE_ACTIVATE: + CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START()); + break; + case AFTER_ACTIVATE: + CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); + break; + case AFTER_CLOSE: + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + break; + + } + } + + public static void listen(final MessageStore store, LogSubject logSubject) + { + new OperationalLoggingListener(store, logSubject); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/java/broker/src/main/java/org/apache/qpid/server/store/State.java index 7928f613d9..7cbdede85e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/State.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/State.java @@ -22,10 +22,17 @@ package org.apache.qpid.server.store; public enum State { + INITIAL, CONFIGURING, + CONFIGURED, RECOVERING, ACTIVE, + QUIESCING, + QUIESCED, CLOSING, - CLOSED + CLOSED; + + + }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java index 41b3cb81bb..5998be5bb6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java @@ -21,9 +21,84 @@ package org.apache.qpid.server.store; +import java.util.EnumMap; +import java.util.Map; + public class StateManager { private State _state = State.INITIAL; + private EventListener _eventListener; + + private static final Map<State,Map<State, Transition>> _validTransitions = new EnumMap<State, Map<State, Transition>>(State.class); + + + static class Transition + { + private final Event _event; + private final State _endState; + private final State _startState; + + public Transition(State startState, State endState, Event event) + { + _event = event; + _startState = startState; + _endState = endState; + + Map<State, Transition> stateTransitions = _validTransitions.get(startState); + if(stateTransitions == null) + { + stateTransitions = new EnumMap<State, Transition>(State.class); + _validTransitions.put(startState, stateTransitions); + } + stateTransitions.put(endState, this); + } + + public Event getEvent() + { + return _event; + } + + public State getStartState() + { + return _startState; + } + + public State getEndState() + { + return _endState; + } + + } + + public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT); + public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT); + public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE); + public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE); + public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE); + public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE); + public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE); + public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE); + public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE); + public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE); + + + public StateManager(final EventManager eventManager) + { + this(new EventListener() + { + @Override + public void event(Event event) + { + eventManager.notifyEvent(event); + } + }); + } + + + public StateManager(EventListener eventListener) + { + _eventListener = eventListener; + } public synchronized State getState() { @@ -37,9 +112,25 @@ public class StateManager throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current + "; currently in state: " + _state); } - _state = desired; + attainState(desired); } + public synchronized void attainState(State desired) + { + Transition transition = null; + final Map<State, Transition> stateTransitionMap = _validTransitions.get(_state); + if(stateTransitionMap != null) + { + transition = stateTransitionMap.get(desired); + } + if(transition == null) + { + throw new IllegalStateException("No valid transition from state " + _state + " to state " + desired); + } + _state = desired; + _eventListener.event(transition.getEvent()); + } + public synchronized boolean isInState(State testedState) { return _state.equals(testedState); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java deleted file mode 100644 index a402e6ee5c..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.decorators; - -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; - -/** - * AbstractDecorator. All methods <bMUST</b> perform simple - * delegation to their equivalent decorated counterpart without - * change. - */ -public class AbstractDecorator implements MessageStore -{ - protected final MessageStore _decoratedStore; - - public AbstractDecorator(MessageStore store) - { - _decoratedStore = store; - } - - @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config) throws Exception - { - _decoratedStore.configureMessageStore(name, messageRecoveryHandler, - tlogRecoveryHandler, config); - } - - @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception - { - _decoratedStore.configureConfigStore(name, recoveryHandler, config); - } - - @Override - public void activate() throws Exception - { - _decoratedStore.activate(); - } - - @Override - public void close() throws Exception - { - _decoratedStore.close(); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage( - T metaData) - { - return _decoratedStore.addMessage(metaData); - } - - @Override - public void createExchange(Exchange exchange) throws AMQStoreException - { - _decoratedStore.createExchange(exchange); - } - - @Override - public boolean isPersistent() - { - return _decoratedStore.isPersistent(); - } - - @Override - public Transaction newTransaction() - { - return _decoratedStore.newTransaction(); - } - - @Override - public void removeExchange(Exchange exchange) throws AMQStoreException - { - _decoratedStore.removeExchange(exchange); - } - - @Override - public void addEventListener(EventListener eventListener, Event event) - { - _decoratedStore.addEventListener(eventListener, event); - } - - @Override - public void bindQueue(Exchange exchange, AMQShortString routingKey, - AMQQueue queue, FieldTable args) throws AMQStoreException - { - _decoratedStore.bindQueue(exchange, routingKey, queue, args); - } - - @Override - public void unbindQueue(Exchange exchange, AMQShortString routingKey, - AMQQueue queue, FieldTable args) throws AMQStoreException - { - _decoratedStore.unbindQueue(exchange, routingKey, queue, args); - } - - @Override - public void createQueue(AMQQueue queue) throws AMQStoreException - { - _decoratedStore.createQueue(queue); - } - - @Override - public void createQueue(AMQQueue queue, FieldTable arguments) - throws AMQStoreException - { - _decoratedStore.createQueue(queue, arguments); - } - - @Override - public void removeQueue(AMQQueue queue) throws AMQStoreException - { - _decoratedStore.removeQueue(queue); - } - - @Override - public void updateQueue(AMQQueue queue) throws AMQStoreException - { - _decoratedStore.updateQueue(queue); - } - - @Override - public void createBrokerLink(BrokerLink link) throws AMQStoreException - { - _decoratedStore.createBrokerLink(link); - } - - @Override - public void deleteBrokerLink(BrokerLink link) throws AMQStoreException - { - _decoratedStore.deleteBrokerLink(link); - } - - @Override - public void createBridge(Bridge bridge) throws AMQStoreException - { - _decoratedStore.createBridge(bridge); - } - - @Override - public void deleteBridge(Bridge bridge) throws AMQStoreException - { - _decoratedStore.deleteBridge(bridge); - } - - @Override - public MessageStore getUnderlyingStore() - { - return _decoratedStore.getUnderlyingStore(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java deleted file mode 100644 index dbf179c2e6..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.decorators; - -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.EventManager; -import org.apache.qpid.server.store.MessageStore; - -public class EventDecorator extends AbstractDecorator -{ - protected final EventManager _eventManager; - - public EventDecorator(MessageStore store) - { - super(store); - _eventManager = new EventManager(); - } - - @Override - public void activate() throws Exception - { - _eventManager.notifyEvent(Event.BEFORE_ACTIVATE); - _decoratedStore.activate(); - _eventManager.notifyEvent(Event.AFTER_ACTIVATE); - } - - @Override - public void close() throws Exception - { - _eventManager.notifyEvent(Event.BEFORE_CLOSE); - _decoratedStore.close(); - _eventManager.notifyEvent(Event.AFTER_CLOSE); - } - - @Override - public void addEventListener(EventListener eventListener, Event event) - { - _eventManager.addEventListener(eventListener, event); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java deleted file mode 100644 index 81d9645c01..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.decorators; - -import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY; - -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; - -public class OperationalLoggingDecorator extends AbstractDecorator -{ - protected final LogSubject _logSubject; - - public OperationalLoggingDecorator(final MessageStore decoratedStore, LogSubject logSubject) - { - super(decoratedStore); - _logSubject = logSubject; - } - - @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config) throws Exception - { - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED()); - CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED()); - - if (config != null && config.getString(ENVIRONMENT_PATH_PROPERTY) != null) - { - CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(config.getString(ENVIRONMENT_PATH_PROPERTY))); - } - - _decoratedStore.configureMessageStore(name, messageRecoveryHandler, - tlogRecoveryHandler, config); - } - - @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception - { - CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED()); - - _decoratedStore.configureConfigStore(name, recoveryHandler, config); - } - - @Override - public void activate() throws Exception - { - CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START()); - _decoratedStore.activate(); - CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); - } - - @Override - public void close() throws Exception - { - CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); - _decoratedStore.close(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index a3d1a7999d..07d8bb97f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -21,9 +21,32 @@ package org.apache.qpid.server.store.derby; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; @@ -36,6 +59,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreConstants; @@ -50,31 +74,6 @@ import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.sql.Blob; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence * mechanism. @@ -229,27 +228,34 @@ public class DerbyMessageStore implements MessageStore private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - private final StateManager _stateManager = new StateManager(); + private final StateManager _stateManager; + + private final EventManager _eventManager = new EventManager(); private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; private ConfigurationRecoveryHandler _configRecoveryHandler; + private String _storeLocation; + + public DerbyMessageStore() + { + _stateManager = new StateManager(_eventManager); + } @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler configRecoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.stateTransition(State.INITIAL, State.CONFIGURING); + _stateManager.attainState(State.CONFIGURING); _configRecoveryHandler = configRecoveryHandler; commonConfiguration(name, storeConfiguration); } - @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, @@ -258,19 +264,22 @@ public class DerbyMessageStore implements MessageStore { _tlogRecoveryHandler = tlogRecoveryHandler; _messageRecoveryHandler = recoveryHandler; + + _stateManager.attainState(State.CONFIGURED); } @Override public void activate() throws Exception { - _stateManager.stateTransition(State.CONFIGURING, State.RECOVERING); + _stateManager.attainState(State.RECOVERING); // this recovers durable exchanges, queues, and bindings recoverConfiguration(_configRecoveryHandler); recoverMessages(_messageRecoveryHandler); TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler); recoverXids(dtxrh); - _stateManager.stateTransition(State.RECOVERING, State.ACTIVE); + + _stateManager.attainState(State.ACTIVE); } private void commonConfiguration(String name, Configuration storeConfiguration) @@ -293,6 +302,8 @@ public class DerbyMessageStore implements MessageStore } } + _storeLocation = databasePath; + createOrOpenDatabase(name, databasePath); } @@ -2678,20 +2689,14 @@ public class DerbyMessageStore implements MessageStore } @Override - public void addEventListener(EventListener eventListener, Event event) + public void addEventListener(EventListener eventListener, Event... events) { - throw new UnsupportedOperationException(); + _eventManager.addEventListener(eventListener, events); } @Override - public MessageStore getUnderlyingStore() + public String getStoreLocation() { - return this; + return _storeLocation; } - - public String getDatabaseProviderName() - { - return DerbyMessageStore.class.getName(); - } - }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java index 02b59dfc06..12d7f64a8d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -19,19 +19,16 @@ */ package org.apache.qpid.server.store.derby; -import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreFactory; -import org.apache.qpid.server.store.decorators.EventDecorator; -import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; public class DerbyMessageStoreFactory implements MessageStoreFactory { @Override - public MessageStore createMessageStore(LogSubject logSubject) + public MessageStore createMessageStore() { - return new OperationalLoggingDecorator(new EventDecorator(new DerbyMessageStore()), logSubject); + return new DerbyMessageStore(); } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index d4f3b11a8c..9b113525d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -61,6 +61,7 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreFactory; +import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; @@ -69,7 +70,6 @@ import javax.management.JMException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -336,8 +336,9 @@ public class VirtualHostImpl implements VirtualHost } final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o; + final MessageStore messageStore = messageStoreFactory.createMessageStore(); final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName()); - final MessageStore messageStore = messageStoreFactory.createMessageStore(storeLogSubject); + OperationalLoggingListener.listen(messageStore, storeLogSubject); messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); @@ -741,40 +742,38 @@ public class VirtualHostImpl implements VirtualHost } private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) - { - try - { - _exchangeRegistry.initialise(); - initialiseModel(_vhostConfig); - } - catch (Exception e) - { - throw new RuntimeException("Failed to initialise virtual host after state change", e); - } - } - } - - private final class AfterActivationListener implements EventListener - { - @Override - public void event(Event event) - { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); - try - { - _brokerMBean.register(); - } - catch (JMException e) - { - throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); - } + { + @Override + public void event(Event event) + { + try + { + _exchangeRegistry.initialise(); + initialiseModel(_vhostConfig); + } catch (Exception e) + { + throw new RuntimeException("Failed to initialise virtual host after state change", e); + } + } + } - _state = State.ACTIVE; - } - } + private final class AfterActivationListener implements EventListener + { + @Override + public void event(Event event) + { + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + try + { + _brokerMBean.register(); + } catch (JMException e) + { + throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + } + + _state = State.ACTIVE; + } + } public class BeforePassivationListener implements EventListener { @@ -794,69 +793,67 @@ public class VirtualHostImpl implements VirtualHost } } - private final class BeforeCloseListener implements EventListener - { - @Override - public void event(Event event) - { - _brokerMBean.unregister(); - shutdownHouseKeeping(); - } - } - - private class VirtualHostHouseKeepingTask extends HouseKeepingTask - { - public VirtualHostHouseKeepingTask() - { - super(VirtualHostImpl.this); - } - - public void execute() - { - for (AMQQueue q : _queueRegistry.getQueues()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Checking message status for queue: " - + q.getName()); - } - try - { - q.checkMessageStatus(); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - } - for (AMQSessionModel session : connection.getSessionModels()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Checking for long running open transactions on session " + session); - } - try - { - session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(), - _vhostConfig.getTransactionTimeoutOpenClose(), - _vhostConfig.getTransactionTimeoutIdleWarn(), - _vhostConfig.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } - } - } + private final class BeforeCloseListener implements EventListener + { + @Override + public void event(Event event) + { + _brokerMBean.unregister(); + shutdownHouseKeeping(); + } + } + + private class VirtualHostHouseKeepingTask extends HouseKeepingTask + { + public VirtualHostHouseKeepingTask() + { + super(VirtualHostImpl.this); + } + + public void execute() + { + for (AMQQueue q : _queueRegistry.getQueues()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking message status for queue: " + + q.getName()); + } + try + { + q.checkMessageStatus(); + } catch (Exception e) + { + _logger.error("Exception in housekeeping for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + } + for (AMQSessionModel session : connection.getSessionModels()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for long running open transactions on session " + session); + } + try + { + session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(), + _vhostConfig.getTransactionTimeoutOpenClose(), + _vhostConfig.getTransactionTimeoutIdleWarn(), + _vhostConfig.getTransactionTimeoutIdleClose()); + } catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index d588cdd42c..45933e7064 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -157,7 +157,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase private void verifyBrokerState() { - TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore().getUnderlyingStore(); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore(); // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 7c3098298e..8041d59ffa 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -636,7 +636,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase qs.add(_queue); MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore().getUnderlyingStore(); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore(); StoredMessage handle = store.addMessage(metaData); msg.setStoredMessage(handle); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java new file mode 100644 index 0000000000..42746f9119 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java @@ -0,0 +1,181 @@ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.List; +import junit.framework.TestCase; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class OperationalLoggingListenerTest extends TestCase +{ + + + public static final String STORE_LOCATION = "The moon!"; + + protected void setUp() throws Exception + { + super.setUp(); + + } + + public void testOperationalLoggingWithStoreLocation() throws Exception + { + TestMessageStore messageStore = new TestMessageStore(); + LogSubject logSubject = LOG_SUBJECT; + + OperationalLoggingListener.listen(messageStore, logSubject); + + performTests(messageStore, true); + + } + + public void testOperationalLogging() throws Exception + { + TestMessageStore messageStore = new TestMessageStore(); + LogSubject logSubject = LOG_SUBJECT; + + OperationalLoggingListener.listen(messageStore, logSubject); + + performTests(messageStore, false); + } + + private void performTests(TestMessageStore messageStore, boolean setStoreLocation) + { + final List<LogMessage> messages = new ArrayList<LogMessage>(); + + CurrentActor.set(new TestActor(messages)); + + if(setStoreLocation) + { + messageStore.setStoreLocation(STORE_LOCATION); + } + + + messageStore.attainState(State.CONFIGURING); + assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size()); + assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString()); + + messageStore.attainState(State.CONFIGURED); + assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size()); + assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString()); + assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString()); + if(setStoreLocation) + { + assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString()); + } + + messageStore.attainState(State.RECOVERING); + assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size()); + assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString()); + + + messageStore.attainState(State.ACTIVE); + assertEquals("Unexpected number of operational log messages on ACTIVE", 1, messages.size()); + assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_COMPLETE().toString()); + + messageStore.attainState(State.CLOSING); + assertEquals("Unexpected number of operational log messages on CLOSING", 0, messages.size()); + + messageStore.attainState(State.CLOSED); + assertEquals("Unexpected number of operational log messages on CLOSED", 1, messages.size()); + assertEquals(messages.remove(0).toString(), MessageStoreMessages.CLOSED().toString()); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + CurrentActor.remove(); + } + + + private static final LogSubject LOG_SUBJECT = new LogSubject() + { + public String toLogString() + { + return ""; + } + }; + + private static final class TestMessageStore extends NullMessageStore + { + + private final EventManager _eventManager = new EventManager(); + private final StateManager _stateManager = new StateManager(_eventManager); + private String _storeLocation; + + public void attainState(State state) + { + _stateManager.attainState(state); + } + + @Override + public String getStoreLocation() + { + return _storeLocation; + } + + public void setStoreLocation(String storeLocation) + { + _storeLocation = storeLocation; + } + + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + } + + private static class TestActor implements LogActor + { + private final List<LogMessage> _messages; + + public TestActor(List<LogMessage> messages) + { + _messages = messages; + } + + public void message(LogSubject subject, LogMessage message) + { + _messages.add(message); + } + + public void message(LogMessage message) + { + _messages.add(message); + } + + public RootMessageLogger getRootMessageLogger() + { + return null; + } + + public String getLogMessage() + { + return null; + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java index b09dcbbdf3..97c88ca1d3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java @@ -21,12 +21,20 @@ package org.apache.qpid.server.store; +import java.util.EnumSet; import junit.framework.TestCase; -public class StateManagerTest extends TestCase +public class StateManagerTest extends TestCase implements EventListener { - private StateManager _manager = new StateManager(); + private StateManager _manager; + private Event _event; + + public void setUp() throws Exception + { + super.setUp(); + _manager = new StateManager(this); + } public void testInitialState() { @@ -37,8 +45,8 @@ public class StateManagerTest extends TestCase { assertEquals(State.INITIAL, _manager.getState()); - _manager.stateTransition(State.INITIAL, State.ACTIVE); - assertEquals(State.ACTIVE, _manager.getState()); + _manager.stateTransition(State.INITIAL, State.CONFIGURING); + assertEquals(State.CONFIGURING, _manager.getState()); } public void testStateTransitionDisallowed() @@ -86,4 +94,102 @@ public class StateManagerTest extends TestCase } assertEquals(State.INITIAL, _manager.getState()); } + + public void testValidStateTransitions() + { + assertEquals(State.INITIAL, _manager.getState()); + performValidTransition(StateManager.CONFIGURE); + performValidTransition(StateManager.CONFIGURE_COMPLETE); + performValidTransition(StateManager.RECOVER); + performValidTransition(StateManager.ACTIVATE); + performValidTransition(StateManager.QUIESCE); + performValidTransition(StateManager.QUIESCE_COMPLETE); + performValidTransition(StateManager.RESTART); + performValidTransition(StateManager.ACTIVATE); + performValidTransition(StateManager.CLOSE_ACTIVE); + performValidTransition(StateManager.CLOSE_COMPLETE); + + _manager = new StateManager(this); + performValidTransition(StateManager.CONFIGURE); + performValidTransition(StateManager.CONFIGURE_COMPLETE); + performValidTransition(StateManager.RECOVER); + performValidTransition(StateManager.ACTIVATE); + performValidTransition(StateManager.QUIESCE); + performValidTransition(StateManager.QUIESCE_COMPLETE); + performValidTransition(StateManager.CLOSE_QUIESCED); + performValidTransition(StateManager.CLOSE_COMPLETE); + } + + private void performValidTransition(StateManager.Transition transition) + { + _manager.attainState(transition.getEndState()); + assertEquals("Unexpected end state", transition.getEndState(), _manager.getState()); + assertEquals("Unexpected event", transition.getEvent(), _event); + _event = null; + } + + public void testInvalidStateTransitions() + { + assertEquals(State.INITIAL, _manager.getState()); + + + performInvalidTransitions(StateManager.CONFIGURE, State.CONFIGURED); + performInvalidTransitions(StateManager.CONFIGURE_COMPLETE, State.RECOVERING); + performInvalidTransitions(StateManager.RECOVER, State.ACTIVE); + performInvalidTransitions(StateManager.ACTIVATE, State.QUIESCING, State.CLOSING); + performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED); + performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.RECOVERING, State.CLOSING); + performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED); + performInvalidTransitions(StateManager.CLOSE_COMPLETE); + + + + + } + + private void performInvalidTransitions(StateManager.Transition preTransition, State... validTransitions) + { + if(preTransition != null) + { + performValidTransition(preTransition); + } + + EnumSet<State> nextStates = EnumSet.allOf(State.class); + + if(validTransitions != null) + { + for(State state: validTransitions) + { + nextStates.remove(state); + } + } + + for(State nextState : nextStates) + { + performInvalidStateTransition(nextState); + } + + + } + + private void performInvalidStateTransition(State state) + { + try + { + _event = null; + State startState = _manager.getState(); + _manager.attainState(state); + fail("Invalid state transition performed: " + startState + " to " + state); + } + catch(IllegalStateException e) + { + // pass + } + assertNull("No event should have be fired", _event); + } + + public void event(Event event) + { + _event = event; + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java index a737836ed5..44070f22ad 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java @@ -19,17 +19,13 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.store.decorators.EventDecorator; -import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; - public class TestableMemoryMessageStoreFactory implements MessageStoreFactory { @Override - public MessageStore createMessageStore(LogSubject logSubject) + public MessageStore createMessageStore() { - return new OperationalLoggingDecorator(new EventDecorator(new TestableMemoryMessageStore()), logSubject); + return new TestableMemoryMessageStore(); } @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java deleted file mode 100644 index 7038b8710b..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.decorators; - -import static org.mockito.Mockito.*; - -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.decorators.EventDecorator; -import org.mockito.InOrder; - -import junit.framework.TestCase; - -public class EventDecoratorTest extends TestCase -{ - private MessageStore _mockStore = mock(MessageStore.class); - private EventListener _mockListener = mock(EventListener.class); - - private EventDecorator _eventDecorator = new EventDecorator(_mockStore); - private InOrder _orderMock = inOrder(_mockListener, _mockStore); - - public void testBeforeActivateDecoration() throws Exception - { - _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE); - _eventDecorator.activate(); - - _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE); - _orderMock.verify(_mockStore).activate(); - } - - public void testAfterActivateDecoration() throws Exception - { - _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE); - _eventDecorator.activate(); - - _orderMock.verify(_mockStore).activate(); - _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE); - } - - public void testBeforeAfterActivateDecoration() throws Exception - { - _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE); - _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE); - _eventDecorator.activate(); - - _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE); - _orderMock.verify(_mockStore).activate(); - _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE); - } - - public void testBeforeAfterCloseDecoration() throws Exception - { - _eventDecorator.addEventListener(_mockListener, Event.BEFORE_CLOSE); - _eventDecorator.addEventListener(_mockListener, Event.AFTER_CLOSE); - _eventDecorator.close(); - - _orderMock.verify(_mockListener).event(Event.BEFORE_CLOSE); - _orderMock.verify(_mockStore).close(); - _orderMock.verify(_mockListener).event(Event.AFTER_CLOSE); - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java deleted file mode 100644 index cf06d3ab72..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.decorators; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY; - -import junit.framework.TestCase; - -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator; -import org.hamcrest.Description; -import org.mockito.ArgumentMatcher; -import org.mockito.InOrder; - -public class OperationalLoggingDecoratorTest extends TestCase -{ - private MessageStore _messageStore = mock(MessageStore.class); - private LogActor _mockActor = mock(LogActor.class); - private LogSubject _mockLogSubject = mock(LogSubject.class); - private OperationalLoggingDecorator _operationalLoggingDecorator = new OperationalLoggingDecorator(_messageStore, _mockLogSubject); - private InOrder _inOrder = inOrder(_mockActor, _messageStore); - - protected void setUp() throws Exception - { - super.setUp(); - CurrentActor.set(_mockActor); - } - - public void testConfigureMessageStore() throws Exception - { - _operationalLoggingDecorator.configureMessageStore(null,null,null,null); - - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created")); - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created")); - _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class)); - } - - public void testConfigureMessageStoreWithStoreLocation() throws Exception - { - final String storeLocation = "/my/store/location"; - Configuration mockConfig = mock(Configuration.class); - when(mockConfig.getString(ENVIRONMENT_PATH_PROPERTY)).thenReturn(storeLocation); - - _operationalLoggingDecorator.configureMessageStore(null,null,null, mockConfig); - - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created")); - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created")); - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1002 : Store location : " + storeLocation)); - _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class)); - } - - public void testConfigureConfigStore() throws Exception - { - _operationalLoggingDecorator.configureConfigStore(null,null,null); - - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("CFG-1001 : Created")); - _inOrder.verify(_messageStore).configureConfigStore(anyString(), any(ConfigurationRecoveryHandler.class), any(Configuration.class)); - } - - public void testActivate() throws Exception - { - _operationalLoggingDecorator.activate(); - - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1004 : Recovery Start")); - _inOrder.verify(_messageStore).activate(); - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1006 : Recovery Complete")); - } - - public void testClose() throws Exception - { - _operationalLoggingDecorator.close(); - - _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1003 : Closed")); - _inOrder.verify(_messageStore).close(); - } - - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - CurrentActor.remove(); - } - - private LogMessage matchesLogMessage(String expectedLogMessage) - { - return argThat(new LogMessageArgumentMatcher(expectedLogMessage)); - } - - private final class LogMessageArgumentMatcher extends ArgumentMatcher<LogMessage> - { - private final String _expectedText; - private String _description = null; -; - public LogMessageArgumentMatcher(String _expectedLogMessage) - { - this._expectedText = _expectedLogMessage; - } - - @Override - public boolean matches(Object item) - { - LogMessage logMessage = (LogMessage) item; - final String actualText = logMessage.toString(); - if (actualText.equals(_expectedText)) - { - return true; - } - else - { - _description = "Expected <" + _expectedText + "> but got <" + actualText + ">"; - return false; - } - } - - @Override - public void describeTo(Description description) - { - if (description != null) - { - description.appendText(" : "+ _description); - } - } - } -}
\ No newline at end of file diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 6b48d55fae..5a11a7aa32 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -98,7 +98,7 @@ public class InternalBrokerBaseCase extends QpidTestCase _virtualHost.getBindingFactory().addBinding(QUEUE_NAME.toString(), _queue, defaultExchange, null); _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = _virtualHost.getMessageStore().getUnderlyingStore(); + _messageStore = _virtualHost.getMessageStore(); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()), false, new AMQShortString("testowner"), false, false, _virtualHost, null); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 2de8b7cdb7..69cc64c0d0 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -358,14 +358,15 @@ public class SlowMessageStore implements MessageStore } @Override - public void addEventListener(EventListener eventListener, Event event) + public void addEventListener(EventListener eventListener, Event... events) { - throw new UnsupportedOperationException(); + _realStore.addEventListener(eventListener, events); } @Override - public MessageStore getUnderlyingStore() + public String getStoreLocation() { - return _realStore.getUnderlyingStore(); + return _realStore.getStoreLocation(); } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java index 840362485e..6497a640d2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java @@ -19,16 +19,13 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.store.decorators.EventDecorator; - public class SlowMessageStoreFactory implements MessageStoreFactory { @Override - public MessageStore createMessageStore(LogSubject logSubject) + public MessageStore createMessageStore() { - return new EventDecorator(new SlowMessageStore()); + return new SlowMessageStore(); } @Override |