summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-04-12 19:46:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-04-12 19:46:03 +0000
commit258e354c633f5efd66d5e57550609b83b37cc330 (patch)
treed75ccdf4dacde868336ad4adf6189e71b1eb08a9
parentbbda9004d532dc5f5f18553edf43456161a186db (diff)
downloadqpid-python-258e354c633f5efd66d5e57550609b83b37cc330.tar.gz
QPID-3917 : Refactor changes to MessageStore to use listeners and not decorators
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1325467 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java61
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java7
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/Event.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java37
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java73
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/State.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java93
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java188
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java86
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java87
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java197
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java181
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java114
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java79
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java157
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java9
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java7
29 files changed, 726 insertions, 793 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index f0fd97dc85..3f394b79a7 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -33,10 +33,18 @@ import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.TransactionConfig;
-
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -52,6 +60,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHa
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -83,17 +92,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
public abstract class AbstractBDBMessageStore implements MessageStore
{
private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class);
@@ -154,7 +152,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private final AtomicLong _messageId = new AtomicLong(0);
- protected final StateManager _stateManager = new StateManager();
+ protected final StateManager _stateManager;
protected TransactionConfig _transactionConfig = new TransactionConfig();
@@ -165,20 +163,27 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
+
+ private final EventManager _eventManager = new EventManager();
+ private String _storeLocation;
public AbstractBDBMessageStore()
{
+ _stateManager = new StateManager(_eventManager);
}
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.stateTransition(State.INITIAL, State.CONFIGURING);
+ _stateManager.attainState(State.CONFIGURING);
_configRecoveryHandler = recoveryHandler;
configure(name,storeConfiguration);
+
+
+
}
public void configureMessageStore(String name,
@@ -188,16 +193,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
_messageRecoveryHandler = messageRecoveryHandler;
_tlogRecoveryHandler = tlogRecoveryHandler;
+
+ _stateManager.attainState(State.CONFIGURED);
}
public void activate() throws Exception
{
- _stateManager.stateTransition(State.CONFIGURING, State.RECOVERING);
+ _stateManager.attainState(State.RECOVERING);
recoverConfig(_configRecoveryHandler);
recoverMessages(_messageRecoveryHandler);
recoverQueueEntries(_tlogRecoveryHandler);
- _stateManager.stateTransition(State.RECOVERING, State.ACTIVE);
+
+ _stateManager.attainState(State.ACTIVE);
}
public org.apache.qpid.server.store.Transaction newTransaction()
@@ -216,8 +224,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void configure(String name, Configuration storeConfig) throws Exception
{
- File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name));
+ final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name);
+
+ File environmentPath = new File(storeLocation);
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
@@ -227,6 +237,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
+ _storeLocation = storeLocation;
+
configure(environmentPath, false);
}
@@ -359,7 +371,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void close() throws Exception
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED))
{
_stateManager.stateTransition(State.ACTIVE, State.CLOSING);
@@ -1975,13 +1987,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void addEventListener(EventListener eventListener, Event event)
+ public void addEventListener(EventListener eventListener, Event... events)
{
- throw new UnsupportedOperationException();
+ _eventManager.addEventListener(eventListener, events);
}
- public MessageStore getUnderlyingStore()
+ @Override
+ public String getStoreLocation()
{
- return this;
+ return _storeLocation;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 145d0f35e3..7e5ef3f94c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -19,19 +19,16 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreFactory;
-import org.apache.qpid.server.store.decorators.EventDecorator;
-import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
public class BDBMessageStoreFactory implements MessageStoreFactory
{
@Override
- public MessageStore createMessageStore(LogSubject logSubject)
+ public MessageStore createMessageStore()
{
- return new OperationalLoggingDecorator(new EventDecorator(new BDBMessageStore()), logSubject);
+ return new BDBMessageStore();
}
@Override
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a45b98b8b5..5b69c2ead6 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -369,10 +369,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
private BDBMessageStore assertBDBStore(MessageStore store)
{
- MessageStore underlyingStore = store.getUnderlyingStore();
- assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, underlyingStore.getClass());
- return (BDBMessageStore) underlyingStore;
+ assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
+
+ return (BDBMessageStore) store;
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
index 95b0186027..bbde11ab4c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
@@ -21,6 +21,8 @@ package org.apache.qpid.server.store;
public enum Event
{
+ BEFORE_INIT,
+ AFTER_INIT,
BEFORE_ACTIVATE,
AFTER_ACTIVATE,
BEFORE_PASSIVATE,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
index 3e10f758f9..21ae3924b8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
@@ -20,22 +20,29 @@
package org.apache.qpid.server.store;
import java.util.ArrayList;
+import java.util.EnumMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
public class EventManager
{
- private ConcurrentMap<Event, List<EventListener>> _listeners = new ConcurrentHashMap<Event, List<EventListener>>();
+ private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class);
- public void addEventListener(EventListener listener, Event event)
+ public synchronized void addEventListener(EventListener listener, Event... events)
{
- _listeners.putIfAbsent(event, new ArrayList<EventListener>());
- final List<EventListener> list = _listeners.get(event);
- list.add(listener);
+ for(Event event : events)
+ {
+ List<EventListener> list = _listeners.get(event);
+ if(list == null)
+ {
+ list = new ArrayList<EventListener>();
+ _listeners.put(event,list);
+ }
+ list.add(listener);
+ }
}
- public void notifyEvent(Event event)
+ public synchronized void notifyEvent(Event event)
{
if (_listeners.containsKey(event))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index de9e73f914..59624b7a75 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
@@ -71,6 +72,34 @@ public class MemoryMessageStore extends NullMessageStore
}
};
+ private final StateManager _stateManager;
+ private final EventManager _eventManager = new EventManager();
+
+ public MemoryMessageStore()
+ {
+ _stateManager = new StateManager(_eventManager);
+ }
+
+ @Override
+ public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+ {
+ _stateManager.attainState(State.CONFIGURING);
+ }
+
+ @Override
+ public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ {
+ _stateManager.attainState(State.CONFIGURED);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _stateManager.attainState(State.RECOVERING);
+
+ _stateManager.attainState(State.ACTIVE);
+ }
+
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
@@ -95,6 +124,14 @@ public class MemoryMessageStore extends NullMessageStore
@Override
public void close() throws Exception
{
+ _stateManager.attainState(State.CLOSING);
_closed.getAndSet(true);
+ _stateManager.attainState(State.CLOSED);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
index 0fb7b1f84f..8724f102c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
@@ -19,17 +19,13 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.store.decorators.EventDecorator;
-import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
-
public class MemoryMessageStoreFactory implements MessageStoreFactory
{
@Override
- public MessageStore createMessageStore(LogSubject logSubject)
+ public MessageStore createMessageStore()
{
- return new OperationalLoggingDecorator(new EventDecorator(new MemoryMessageStore()), logSubject);
+ return new MemoryMessageStore();
}
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index c76d4f223f..cf08ee00ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -64,8 +64,7 @@ public interface MessageStore extends DurableConfigurationStore
*/
void close() throws Exception;
- void addEventListener(EventListener eventListener, Event event);
-
- MessageStore getUnderlyingStore();
+ void addEventListener(EventListener eventListener, Event... events);
+ String getStoreLocation();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
index 878798eac3..a35db62b03 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
@@ -19,11 +19,9 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-
public interface MessageStoreFactory
{
- MessageStore createMessageStore(LogSubject logSubject);
+ MessageStore createMessageStore();
String getStoreClassName();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 0b55f74730..77df6c5abf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -133,13 +133,14 @@ public class NullMessageStore implements MessageStore
}
@Override
- public void addEventListener(EventListener eventListener, Event event)
+ public void addEventListener(EventListener eventListener, Event... events)
{
}
@Override
- public MessageStore getUnderlyingStore()
+ public String getStoreLocation()
{
- return this;
+ return null;
}
+
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
new file mode 100644
index 0000000000..caff17daa5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+
+public class OperationalLoggingListener implements EventListener
+{
+ protected final LogSubject _logSubject;
+ private MessageStore _store;
+
+ private OperationalLoggingListener(final MessageStore store, LogSubject logSubject)
+ {
+ _logSubject = logSubject;
+ store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE);
+ _store = store;
+ }
+
+ public void event(Event event)
+ {
+ switch(event)
+ {
+ case BEFORE_INIT:
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED());
+ break;
+ case AFTER_INIT:
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED());
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED());
+ String storeLocation = _store.getStoreLocation();
+ if (storeLocation != null)
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation));
+ }
+ break;
+ case BEFORE_ACTIVATE:
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START());
+ break;
+ case AFTER_ACTIVATE:
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+ break;
+ case AFTER_CLOSE:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ break;
+
+ }
+ }
+
+ public static void listen(final MessageStore store, LogSubject logSubject)
+ {
+ new OperationalLoggingListener(store, logSubject);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/java/broker/src/main/java/org/apache/qpid/server/store/State.java
index 7928f613d9..7cbdede85e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/State.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/State.java
@@ -22,10 +22,17 @@ package org.apache.qpid.server.store;
public enum State
{
+
INITIAL,
CONFIGURING,
+ CONFIGURED,
RECOVERING,
ACTIVE,
+ QUIESCING,
+ QUIESCED,
CLOSING,
- CLOSED
+ CLOSED;
+
+
+
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
index 41b3cb81bb..5998be5bb6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
@@ -21,9 +21,84 @@
package org.apache.qpid.server.store;
+import java.util.EnumMap;
+import java.util.Map;
+
public class StateManager
{
private State _state = State.INITIAL;
+ private EventListener _eventListener;
+
+ private static final Map<State,Map<State, Transition>> _validTransitions = new EnumMap<State, Map<State, Transition>>(State.class);
+
+
+ static class Transition
+ {
+ private final Event _event;
+ private final State _endState;
+ private final State _startState;
+
+ public Transition(State startState, State endState, Event event)
+ {
+ _event = event;
+ _startState = startState;
+ _endState = endState;
+
+ Map<State, Transition> stateTransitions = _validTransitions.get(startState);
+ if(stateTransitions == null)
+ {
+ stateTransitions = new EnumMap<State, Transition>(State.class);
+ _validTransitions.put(startState, stateTransitions);
+ }
+ stateTransitions.put(endState, this);
+ }
+
+ public Event getEvent()
+ {
+ return _event;
+ }
+
+ public State getStartState()
+ {
+ return _startState;
+ }
+
+ public State getEndState()
+ {
+ return _endState;
+ }
+
+ }
+
+ public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT);
+ public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT);
+ public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE);
+ public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE);
+ public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
+ public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
+ public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
+ public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE);
+ public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE);
+ public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE);
+
+
+ public StateManager(final EventManager eventManager)
+ {
+ this(new EventListener()
+ {
+ @Override
+ public void event(Event event)
+ {
+ eventManager.notifyEvent(event);
+ }
+ });
+ }
+
+
+ public StateManager(EventListener eventListener)
+ {
+ _eventListener = eventListener;
+ }
public synchronized State getState()
{
@@ -37,9 +112,25 @@ public class StateManager
throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
+ "; currently in state: " + _state);
}
- _state = desired;
+ attainState(desired);
}
+ public synchronized void attainState(State desired)
+ {
+ Transition transition = null;
+ final Map<State, Transition> stateTransitionMap = _validTransitions.get(_state);
+ if(stateTransitionMap != null)
+ {
+ transition = stateTransitionMap.get(desired);
+ }
+ if(transition == null)
+ {
+ throw new IllegalStateException("No valid transition from state " + _state + " to state " + desired);
+ }
+ _state = desired;
+ _eventListener.event(transition.getEvent());
+ }
+
public synchronized boolean isInState(State testedState)
{
return _state.equals(testedState);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java
deleted file mode 100644
index a402e6ee5c..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.decorators;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-
-/**
- * AbstractDecorator. All methods <bMUST</b> perform simple
- * delegation to their equivalent decorated counterpart without
- * change.
- */
-public class AbstractDecorator implements MessageStore
-{
- protected final MessageStore _decoratedStore;
-
- public AbstractDecorator(MessageStore store)
- {
- _decoratedStore = store;
- }
-
- @Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config) throws Exception
- {
- _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
- tlogRecoveryHandler, config);
- }
-
- @Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
- {
- _decoratedStore.configureConfigStore(name, recoveryHandler, config);
- }
-
- @Override
- public void activate() throws Exception
- {
- _decoratedStore.activate();
- }
-
- @Override
- public void close() throws Exception
- {
- _decoratedStore.close();
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(
- T metaData)
- {
- return _decoratedStore.addMessage(metaData);
- }
-
- @Override
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- _decoratedStore.createExchange(exchange);
- }
-
- @Override
- public boolean isPersistent()
- {
- return _decoratedStore.isPersistent();
- }
-
- @Override
- public Transaction newTransaction()
- {
- return _decoratedStore.newTransaction();
- }
-
- @Override
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- _decoratedStore.removeExchange(exchange);
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event event)
- {
- _decoratedStore.addEventListener(eventListener, event);
- }
-
- @Override
- public void bindQueue(Exchange exchange, AMQShortString routingKey,
- AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- _decoratedStore.bindQueue(exchange, routingKey, queue, args);
- }
-
- @Override
- public void unbindQueue(Exchange exchange, AMQShortString routingKey,
- AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- _decoratedStore.unbindQueue(exchange, routingKey, queue, args);
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- _decoratedStore.createQueue(queue);
- }
-
- @Override
- public void createQueue(AMQQueue queue, FieldTable arguments)
- throws AMQStoreException
- {
- _decoratedStore.createQueue(queue, arguments);
- }
-
- @Override
- public void removeQueue(AMQQueue queue) throws AMQStoreException
- {
- _decoratedStore.removeQueue(queue);
- }
-
- @Override
- public void updateQueue(AMQQueue queue) throws AMQStoreException
- {
- _decoratedStore.updateQueue(queue);
- }
-
- @Override
- public void createBrokerLink(BrokerLink link) throws AMQStoreException
- {
- _decoratedStore.createBrokerLink(link);
- }
-
- @Override
- public void deleteBrokerLink(BrokerLink link) throws AMQStoreException
- {
- _decoratedStore.deleteBrokerLink(link);
- }
-
- @Override
- public void createBridge(Bridge bridge) throws AMQStoreException
- {
- _decoratedStore.createBridge(bridge);
- }
-
- @Override
- public void deleteBridge(Bridge bridge) throws AMQStoreException
- {
- _decoratedStore.deleteBridge(bridge);
- }
-
- @Override
- public MessageStore getUnderlyingStore()
- {
- return _decoratedStore.getUnderlyingStore();
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java
deleted file mode 100644
index dbf179c2e6..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.decorators;
-
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
-import org.apache.qpid.server.store.MessageStore;
-
-public class EventDecorator extends AbstractDecorator
-{
- protected final EventManager _eventManager;
-
- public EventDecorator(MessageStore store)
- {
- super(store);
- _eventManager = new EventManager();
- }
-
- @Override
- public void activate() throws Exception
- {
- _eventManager.notifyEvent(Event.BEFORE_ACTIVATE);
- _decoratedStore.activate();
- _eventManager.notifyEvent(Event.AFTER_ACTIVATE);
- }
-
- @Override
- public void close() throws Exception
- {
- _eventManager.notifyEvent(Event.BEFORE_CLOSE);
- _decoratedStore.close();
- _eventManager.notifyEvent(Event.AFTER_CLOSE);
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event event)
- {
- _eventManager.addEventListener(eventListener, event);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java
deleted file mode 100644
index 81d9645c01..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.decorators;
-
-import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-
-public class OperationalLoggingDecorator extends AbstractDecorator
-{
- protected final LogSubject _logSubject;
-
- public OperationalLoggingDecorator(final MessageStore decoratedStore, LogSubject logSubject)
- {
- super(decoratedStore);
- _logSubject = logSubject;
- }
-
- @Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config) throws Exception
- {
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED());
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED());
-
- if (config != null && config.getString(ENVIRONMENT_PATH_PROPERTY) != null)
- {
- CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(config.getString(ENVIRONMENT_PATH_PROPERTY)));
- }
-
- _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
- tlogRecoveryHandler, config);
- }
-
- @Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
- {
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED());
-
- _decoratedStore.configureConfigStore(name, recoveryHandler, config);
- }
-
- @Override
- public void activate() throws Exception
- {
- CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START());
- _decoratedStore.activate();
- CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
- }
-
- @Override
- public void close() throws Exception
- {
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
- _decoratedStore.close();
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index a3d1a7999d..07d8bb97f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -21,9 +21,32 @@
package org.apache.qpid.server.store.derby;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
@@ -36,6 +59,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreConstants;
@@ -50,31 +74,6 @@ import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
* mechanism.
@@ -229,27 +228,34 @@ public class DerbyMessageStore implements MessageStore
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
- private final StateManager _stateManager = new StateManager();
+ private final StateManager _stateManager;
+
+ private final EventManager _eventManager = new EventManager();
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
+ private String _storeLocation;
+
+ public DerbyMessageStore()
+ {
+ _stateManager = new StateManager(_eventManager);
+ }
@Override
public void configureConfigStore(String name,
ConfigurationRecoveryHandler configRecoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.stateTransition(State.INITIAL, State.CONFIGURING);
+ _stateManager.attainState(State.CONFIGURING);
_configRecoveryHandler = configRecoveryHandler;
commonConfiguration(name, storeConfiguration);
}
-
@Override
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
@@ -258,19 +264,22 @@ public class DerbyMessageStore implements MessageStore
{
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
+
+ _stateManager.attainState(State.CONFIGURED);
}
@Override
public void activate() throws Exception
{
- _stateManager.stateTransition(State.CONFIGURING, State.RECOVERING);
+ _stateManager.attainState(State.RECOVERING);
// this recovers durable exchanges, queues, and bindings
recoverConfiguration(_configRecoveryHandler);
recoverMessages(_messageRecoveryHandler);
TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
recoverXids(dtxrh);
- _stateManager.stateTransition(State.RECOVERING, State.ACTIVE);
+
+ _stateManager.attainState(State.ACTIVE);
}
private void commonConfiguration(String name, Configuration storeConfiguration)
@@ -293,6 +302,8 @@ public class DerbyMessageStore implements MessageStore
}
}
+ _storeLocation = databasePath;
+
createOrOpenDatabase(name, databasePath);
}
@@ -2678,20 +2689,14 @@ public class DerbyMessageStore implements MessageStore
}
@Override
- public void addEventListener(EventListener eventListener, Event event)
+ public void addEventListener(EventListener eventListener, Event... events)
{
- throw new UnsupportedOperationException();
+ _eventManager.addEventListener(eventListener, events);
}
@Override
- public MessageStore getUnderlyingStore()
+ public String getStoreLocation()
{
- return this;
+ return _storeLocation;
}
-
- public String getDatabaseProviderName()
- {
- return DerbyMessageStore.class.getName();
- }
-
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
index 02b59dfc06..12d7f64a8d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -19,19 +19,16 @@
*/
package org.apache.qpid.server.store.derby;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreFactory;
-import org.apache.qpid.server.store.decorators.EventDecorator;
-import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
public class DerbyMessageStoreFactory implements MessageStoreFactory
{
@Override
- public MessageStore createMessageStore(LogSubject logSubject)
+ public MessageStore createMessageStore()
{
- return new OperationalLoggingDecorator(new EventDecorator(new DerbyMessageStore()), logSubject);
+ return new DerbyMessageStore();
}
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index d4f3b11a8c..9b113525d4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -61,6 +61,7 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreFactory;
+import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
@@ -69,7 +70,6 @@ import javax.management.JMException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -336,8 +336,9 @@ public class VirtualHostImpl implements VirtualHost
}
final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
+ final MessageStore messageStore = messageStoreFactory.createMessageStore();
final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
- final MessageStore messageStore = messageStoreFactory.createMessageStore(storeLogSubject);
+ OperationalLoggingListener.listen(messageStore, storeLogSubject);
messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
@@ -741,40 +742,38 @@ public class VirtualHostImpl implements VirtualHost
}
private final class BeforeActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- try
- {
- _exchangeRegistry.initialise();
- initialiseModel(_vhostConfig);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to initialise virtual host after state change", e);
- }
- }
- }
-
- private final class AfterActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
- try
- {
- _brokerMBean.register();
- }
- catch (JMException e)
- {
- throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
- }
+ {
+ @Override
+ public void event(Event event)
+ {
+ try
+ {
+ _exchangeRegistry.initialise();
+ initialiseModel(_vhostConfig);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Failed to initialise virtual host after state change", e);
+ }
+ }
+ }
- _state = State.ACTIVE;
- }
- }
+ private final class AfterActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ try
+ {
+ _brokerMBean.register();
+ } catch (JMException e)
+ {
+ throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ }
+
+ _state = State.ACTIVE;
+ }
+ }
public class BeforePassivationListener implements EventListener
{
@@ -794,69 +793,67 @@ public class VirtualHostImpl implements VirtualHost
}
}
- private final class BeforeCloseListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- _brokerMBean.unregister();
- shutdownHouseKeeping();
- }
- }
-
- private class VirtualHostHouseKeepingTask extends HouseKeepingTask
- {
- public VirtualHostHouseKeepingTask()
- {
- super(VirtualHostImpl.this);
- }
-
- public void execute()
- {
- for (AMQQueue q : _queueRegistry.getQueues())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Checking message status for queue: "
- + q.getName());
- }
- try
- {
- q.checkMessageStatus();
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
- }
- }
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Checking for long running open transactions on connection " + connection);
- }
- for (AMQSessionModel session : connection.getSessionModels())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Checking for long running open transactions on session " + session);
- }
- try
- {
- session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(),
- _vhostConfig.getTransactionTimeoutOpenClose(),
- _vhostConfig.getTransactionTimeoutIdleWarn(),
- _vhostConfig.getTransactionTimeoutIdleClose());
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
- }
- }
- }
- }
- }
+ private final class BeforeCloseListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ _brokerMBean.unregister();
+ shutdownHouseKeeping();
+ }
+ }
+
+ private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+ {
+ public VirtualHostHouseKeepingTask()
+ {
+ super(VirtualHostImpl.this);
+ }
+
+ public void execute()
+ {
+ for (AMQQueue q : _queueRegistry.getQueues())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking message status for queue: "
+ + q.getName());
+ }
+ try
+ {
+ q.checkMessageStatus();
+ } catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for queue: "
+ + q.getNameShortString().toString(), e);
+ //Don't throw exceptions as this will stop the
+ // house keeping task from running.
+ }
+ }
+ for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for long running open transactions on connection " + connection);
+ }
+ for (AMQSessionModel session : connection.getSessionModels())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for long running open transactions on session " + session);
+ }
+ try
+ {
+ session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(),
+ _vhostConfig.getTransactionTimeoutOpenClose(),
+ _vhostConfig.getTransactionTimeoutIdleWarn(),
+ _vhostConfig.getTransactionTimeoutIdleClose());
+ } catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ }
+ }
+ }
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index d588cdd42c..45933e7064 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -157,7 +157,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore().getUnderlyingStore();
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) getVirtualHost().getMessageStore();
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 7c3098298e..8041d59ffa 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -636,7 +636,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
qs.add(_queue);
MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis());
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore().getUnderlyingStore();
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore();
StoredMessage handle = store.addMessage(metaData);
msg.setStoredMessage(handle);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
new file mode 100644
index 0000000000..42746f9119
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
@@ -0,0 +1,181 @@
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class OperationalLoggingListenerTest extends TestCase
+{
+
+
+ public static final String STORE_LOCATION = "The moon!";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ }
+
+ public void testOperationalLoggingWithStoreLocation() throws Exception
+ {
+ TestMessageStore messageStore = new TestMessageStore();
+ LogSubject logSubject = LOG_SUBJECT;
+
+ OperationalLoggingListener.listen(messageStore, logSubject);
+
+ performTests(messageStore, true);
+
+ }
+
+ public void testOperationalLogging() throws Exception
+ {
+ TestMessageStore messageStore = new TestMessageStore();
+ LogSubject logSubject = LOG_SUBJECT;
+
+ OperationalLoggingListener.listen(messageStore, logSubject);
+
+ performTests(messageStore, false);
+ }
+
+ private void performTests(TestMessageStore messageStore, boolean setStoreLocation)
+ {
+ final List<LogMessage> messages = new ArrayList<LogMessage>();
+
+ CurrentActor.set(new TestActor(messages));
+
+ if(setStoreLocation)
+ {
+ messageStore.setStoreLocation(STORE_LOCATION);
+ }
+
+
+ messageStore.attainState(State.CONFIGURING);
+ assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size());
+ assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString());
+
+ messageStore.attainState(State.CONFIGURED);
+ assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size());
+ assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString());
+ assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString());
+ if(setStoreLocation)
+ {
+ assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString());
+ }
+
+ messageStore.attainState(State.RECOVERING);
+ assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size());
+ assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString());
+
+
+ messageStore.attainState(State.ACTIVE);
+ assertEquals("Unexpected number of operational log messages on ACTIVE", 1, messages.size());
+ assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_COMPLETE().toString());
+
+ messageStore.attainState(State.CLOSING);
+ assertEquals("Unexpected number of operational log messages on CLOSING", 0, messages.size());
+
+ messageStore.attainState(State.CLOSED);
+ assertEquals("Unexpected number of operational log messages on CLOSED", 1, messages.size());
+ assertEquals(messages.remove(0).toString(), MessageStoreMessages.CLOSED().toString());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ CurrentActor.remove();
+ }
+
+
+ private static final LogSubject LOG_SUBJECT = new LogSubject()
+ {
+ public String toLogString()
+ {
+ return "";
+ }
+ };
+
+ private static final class TestMessageStore extends NullMessageStore
+ {
+
+ private final EventManager _eventManager = new EventManager();
+ private final StateManager _stateManager = new StateManager(_eventManager);
+ private String _storeLocation;
+
+ public void attainState(State state)
+ {
+ _stateManager.attainState(state);
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _storeLocation;
+ }
+
+ public void setStoreLocation(String storeLocation)
+ {
+ _storeLocation = storeLocation;
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+ }
+
+ private static class TestActor implements LogActor
+ {
+ private final List<LogMessage> _messages;
+
+ public TestActor(List<LogMessage> messages)
+ {
+ _messages = messages;
+ }
+
+ public void message(LogSubject subject, LogMessage message)
+ {
+ _messages.add(message);
+ }
+
+ public void message(LogMessage message)
+ {
+ _messages.add(message);
+ }
+
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return null;
+ }
+
+ public String getLogMessage()
+ {
+ return null;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
index b09dcbbdf3..97c88ca1d3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
@@ -21,12 +21,20 @@
package org.apache.qpid.server.store;
+import java.util.EnumSet;
import junit.framework.TestCase;
-public class StateManagerTest extends TestCase
+public class StateManagerTest extends TestCase implements EventListener
{
- private StateManager _manager = new StateManager();
+ private StateManager _manager;
+ private Event _event;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _manager = new StateManager(this);
+ }
public void testInitialState()
{
@@ -37,8 +45,8 @@ public class StateManagerTest extends TestCase
{
assertEquals(State.INITIAL, _manager.getState());
- _manager.stateTransition(State.INITIAL, State.ACTIVE);
- assertEquals(State.ACTIVE, _manager.getState());
+ _manager.stateTransition(State.INITIAL, State.CONFIGURING);
+ assertEquals(State.CONFIGURING, _manager.getState());
}
public void testStateTransitionDisallowed()
@@ -86,4 +94,102 @@ public class StateManagerTest extends TestCase
}
assertEquals(State.INITIAL, _manager.getState());
}
+
+ public void testValidStateTransitions()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+ performValidTransition(StateManager.CONFIGURE);
+ performValidTransition(StateManager.CONFIGURE_COMPLETE);
+ performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.QUIESCE);
+ performValidTransition(StateManager.QUIESCE_COMPLETE);
+ performValidTransition(StateManager.RESTART);
+ performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.CLOSE_ACTIVE);
+ performValidTransition(StateManager.CLOSE_COMPLETE);
+
+ _manager = new StateManager(this);
+ performValidTransition(StateManager.CONFIGURE);
+ performValidTransition(StateManager.CONFIGURE_COMPLETE);
+ performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.QUIESCE);
+ performValidTransition(StateManager.QUIESCE_COMPLETE);
+ performValidTransition(StateManager.CLOSE_QUIESCED);
+ performValidTransition(StateManager.CLOSE_COMPLETE);
+ }
+
+ private void performValidTransition(StateManager.Transition transition)
+ {
+ _manager.attainState(transition.getEndState());
+ assertEquals("Unexpected end state", transition.getEndState(), _manager.getState());
+ assertEquals("Unexpected event", transition.getEvent(), _event);
+ _event = null;
+ }
+
+ public void testInvalidStateTransitions()
+ {
+ assertEquals(State.INITIAL, _manager.getState());
+
+
+ performInvalidTransitions(StateManager.CONFIGURE, State.CONFIGURED);
+ performInvalidTransitions(StateManager.CONFIGURE_COMPLETE, State.RECOVERING);
+ performInvalidTransitions(StateManager.RECOVER, State.ACTIVE);
+ performInvalidTransitions(StateManager.ACTIVATE, State.QUIESCING, State.CLOSING);
+ performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
+ performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.RECOVERING, State.CLOSING);
+ performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED);
+ performInvalidTransitions(StateManager.CLOSE_COMPLETE);
+
+
+
+
+ }
+
+ private void performInvalidTransitions(StateManager.Transition preTransition, State... validTransitions)
+ {
+ if(preTransition != null)
+ {
+ performValidTransition(preTransition);
+ }
+
+ EnumSet<State> nextStates = EnumSet.allOf(State.class);
+
+ if(validTransitions != null)
+ {
+ for(State state: validTransitions)
+ {
+ nextStates.remove(state);
+ }
+ }
+
+ for(State nextState : nextStates)
+ {
+ performInvalidStateTransition(nextState);
+ }
+
+
+ }
+
+ private void performInvalidStateTransition(State state)
+ {
+ try
+ {
+ _event = null;
+ State startState = _manager.getState();
+ _manager.attainState(state);
+ fail("Invalid state transition performed: " + startState + " to " + state);
+ }
+ catch(IllegalStateException e)
+ {
+ // pass
+ }
+ assertNull("No event should have be fired", _event);
+ }
+
+ public void event(Event event)
+ {
+ _event = event;
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
index a737836ed5..44070f22ad 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
@@ -19,17 +19,13 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.store.decorators.EventDecorator;
-import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
-
public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
{
@Override
- public MessageStore createMessageStore(LogSubject logSubject)
+ public MessageStore createMessageStore()
{
- return new OperationalLoggingDecorator(new EventDecorator(new TestableMemoryMessageStore()), logSubject);
+ return new TestableMemoryMessageStore();
}
@Override
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java
deleted file mode 100644
index 7038b8710b..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/EventDecoratorTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.decorators;
-
-import static org.mockito.Mockito.*;
-
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.decorators.EventDecorator;
-import org.mockito.InOrder;
-
-import junit.framework.TestCase;
-
-public class EventDecoratorTest extends TestCase
-{
- private MessageStore _mockStore = mock(MessageStore.class);
- private EventListener _mockListener = mock(EventListener.class);
-
- private EventDecorator _eventDecorator = new EventDecorator(_mockStore);
- private InOrder _orderMock = inOrder(_mockListener, _mockStore);
-
- public void testBeforeActivateDecoration() throws Exception
- {
- _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE);
- _eventDecorator.activate();
-
- _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE);
- _orderMock.verify(_mockStore).activate();
- }
-
- public void testAfterActivateDecoration() throws Exception
- {
- _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE);
- _eventDecorator.activate();
-
- _orderMock.verify(_mockStore).activate();
- _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE);
- }
-
- public void testBeforeAfterActivateDecoration() throws Exception
- {
- _eventDecorator.addEventListener(_mockListener, Event.BEFORE_ACTIVATE);
- _eventDecorator.addEventListener(_mockListener, Event.AFTER_ACTIVATE);
- _eventDecorator.activate();
-
- _orderMock.verify(_mockListener).event(Event.BEFORE_ACTIVATE);
- _orderMock.verify(_mockStore).activate();
- _orderMock.verify(_mockListener).event(Event.AFTER_ACTIVATE);
- }
-
- public void testBeforeAfterCloseDecoration() throws Exception
- {
- _eventDecorator.addEventListener(_mockListener, Event.BEFORE_CLOSE);
- _eventDecorator.addEventListener(_mockListener, Event.AFTER_CLOSE);
- _eventDecorator.close();
-
- _orderMock.verify(_mockListener).event(Event.BEFORE_CLOSE);
- _orderMock.verify(_mockStore).close();
- _orderMock.verify(_mockListener).event(Event.AFTER_CLOSE);
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java
deleted file mode 100644
index cf06d3ab72..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecoratorTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.decorators;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-import org.mockito.InOrder;
-
-public class OperationalLoggingDecoratorTest extends TestCase
-{
- private MessageStore _messageStore = mock(MessageStore.class);
- private LogActor _mockActor = mock(LogActor.class);
- private LogSubject _mockLogSubject = mock(LogSubject.class);
- private OperationalLoggingDecorator _operationalLoggingDecorator = new OperationalLoggingDecorator(_messageStore, _mockLogSubject);
- private InOrder _inOrder = inOrder(_mockActor, _messageStore);
-
- protected void setUp() throws Exception
- {
- super.setUp();
- CurrentActor.set(_mockActor);
- }
-
- public void testConfigureMessageStore() throws Exception
- {
- _operationalLoggingDecorator.configureMessageStore(null,null,null,null);
-
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created"));
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created"));
- _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class));
- }
-
- public void testConfigureMessageStoreWithStoreLocation() throws Exception
- {
- final String storeLocation = "/my/store/location";
- Configuration mockConfig = mock(Configuration.class);
- when(mockConfig.getString(ENVIRONMENT_PATH_PROPERTY)).thenReturn(storeLocation);
-
- _operationalLoggingDecorator.configureMessageStore(null,null,null, mockConfig);
-
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1001 : Created"));
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("TXN-1001 : Created"));
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1002 : Store location : " + storeLocation));
- _inOrder.verify(_messageStore).configureMessageStore(anyString(), any(MessageStoreRecoveryHandler.class), any(TransactionLogRecoveryHandler.class), any(Configuration.class));
- }
-
- public void testConfigureConfigStore() throws Exception
- {
- _operationalLoggingDecorator.configureConfigStore(null,null,null);
-
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("CFG-1001 : Created"));
- _inOrder.verify(_messageStore).configureConfigStore(anyString(), any(ConfigurationRecoveryHandler.class), any(Configuration.class));
- }
-
- public void testActivate() throws Exception
- {
- _operationalLoggingDecorator.activate();
-
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1004 : Recovery Start"));
- _inOrder.verify(_messageStore).activate();
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1006 : Recovery Complete"));
- }
-
- public void testClose() throws Exception
- {
- _operationalLoggingDecorator.close();
-
- _inOrder.verify(_mockActor).message(eq(_mockLogSubject), matchesLogMessage("MST-1003 : Closed"));
- _inOrder.verify(_messageStore).close();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- CurrentActor.remove();
- }
-
- private LogMessage matchesLogMessage(String expectedLogMessage)
- {
- return argThat(new LogMessageArgumentMatcher(expectedLogMessage));
- }
-
- private final class LogMessageArgumentMatcher extends ArgumentMatcher<LogMessage>
- {
- private final String _expectedText;
- private String _description = null;
-;
- public LogMessageArgumentMatcher(String _expectedLogMessage)
- {
- this._expectedText = _expectedLogMessage;
- }
-
- @Override
- public boolean matches(Object item)
- {
- LogMessage logMessage = (LogMessage) item;
- final String actualText = logMessage.toString();
- if (actualText.equals(_expectedText))
- {
- return true;
- }
- else
- {
- _description = "Expected <" + _expectedText + "> but got <" + actualText + ">";
- return false;
- }
- }
-
- @Override
- public void describeTo(Description description)
- {
- if (description != null)
- {
- description.appendText(" : "+ _description);
- }
- }
- }
-} \ No newline at end of file
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 6b48d55fae..5a11a7aa32 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -98,7 +98,7 @@ public class InternalBrokerBaseCase extends QpidTestCase
_virtualHost.getBindingFactory().addBinding(QUEUE_NAME.toString(), _queue, defaultExchange, null);
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore().getUnderlyingStore();
+ _messageStore = _virtualHost.getMessageStore();
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()), false, new AMQShortString("testowner"),
false, false, _virtualHost, null);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 2de8b7cdb7..69cc64c0d0 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -358,14 +358,15 @@ public class SlowMessageStore implements MessageStore
}
@Override
- public void addEventListener(EventListener eventListener, Event event)
+ public void addEventListener(EventListener eventListener, Event... events)
{
- throw new UnsupportedOperationException();
+ _realStore.addEventListener(eventListener, events);
}
@Override
- public MessageStore getUnderlyingStore()
+ public String getStoreLocation()
{
- return _realStore.getUnderlyingStore();
+ return _realStore.getStoreLocation();
}
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
index 840362485e..6497a640d2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
@@ -19,16 +19,13 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.store.decorators.EventDecorator;
-
public class SlowMessageStoreFactory implements MessageStoreFactory
{
@Override
- public MessageStore createMessageStore(LogSubject logSubject)
+ public MessageStore createMessageStore()
{
- return new EventDecorator(new SlowMessageStore());
+ return new SlowMessageStore();
}
@Override