/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import java.io.File; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockStoredMessage; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; public class DurableConfigurationStoreTest extends QpidTestCase { private static final String EXCHANGE_NAME = "exchangeName"; private String _storePath; private String _storeName; private MessageStore _store; private Configuration _configuration; private ConfigurationRecoveryHandler _recoveryHandler; private QueueRecoveryHandler _queueRecoveryHandler; private ExchangeRecoveryHandler _exchangeRecoveryHandler; private BindingRecoveryHandler _bindingRecoveryHandler; private ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler _linkRecoveryHandler; private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; private TransactionLogRecoveryHandler _logRecoveryHandler; private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; private Exchange _exchange = mock(Exchange.class); private static final String ROUTING_KEY = "routingKey"; private static final String QUEUE_NAME = "queueName"; private FieldTable _bindingArgs; private UUID _queueId; private UUID _exchangeId; public void setUp() throws Exception { super.setUp(); _queueId = UUIDGenerator.generateRandomUUID(); _exchangeId = UUIDGenerator.generateRandomUUID(); _storeName = getName(); _storePath = TMP_FOLDER + File.separator + _storeName; FileUtils.delete(new File(_storePath), true); setTestSystemProperty("QPID_WORK", TMP_FOLDER); _configuration = mock(Configuration.class); _recoveryHandler = mock(ConfigurationRecoveryHandler.class); _queueRecoveryHandler = mock(QueueRecoveryHandler.class); _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); when(_exchange.getId()).thenReturn(_exchangeId); when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( _storePath); _bindingArgs = new FieldTable(); AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); String argValue = "some selector expression"; _bindingArgs.put(argKey, argValue); reopenStore(); } public void tearDown() throws Exception { FileUtils.delete(new File(_storePath), true); super.tearDown(); } public void testCreateExchange() throws Exception { Exchange exchange = createTestExchange(); _store.createExchange(exchange); reopenStore(); verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); } public void testRemoveExchange() throws Exception { Exchange exchange = createTestExchange(); _store.createExchange(exchange); _store.removeExchange(exchange); reopenStore(); verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); } public void testBindQueue() throws Exception { AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), null, ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); _store.bindQueue(binding); reopenStore(); ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes()); verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes); } public void testUnbindQueue() throws Exception { AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), null, ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); _store.bindQueue(binding); _store.unbindQueue(binding); reopenStore(); verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), isA(ByteBuffer.class)); } public void testCreateQueueAMQQueue() throws Exception { AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); _store.createQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null); } public void testCreateQueueAMQQueueFieldTable() throws Exception { AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map attributes = new HashMap(); attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); _store.createQueue(queue, arguments); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception { Exchange alternateExchange = createTestAlternateExchange(); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); _store.createQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId()); } private Exchange createTestAlternateExchange() { UUID exchUuid = UUID.randomUUID(); Exchange alternateExchange = mock(Exchange.class); when(alternateExchange.getId()).thenReturn(exchUuid); return alternateExchange; } public void testUpdateQueueExclusivity() throws Exception { // create queue AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map attributes = new HashMap(); attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); _store.createQueue(queue, arguments); // update the queue to have exclusive=false queue = createTestQueue(getName(), getName() + "Owner", false); _store.updateQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null); } public void testUpdateQueueAlternateExchange() throws Exception { // create queue AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map attributes = new HashMap(); attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); _store.createQueue(queue, arguments); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); _store.updateQueue(queue); reopenStore(); verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId()); } public void testRemoveQueue() throws Exception { // create queue AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map attributes = new HashMap(); attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); _store.createQueue(queue, arguments); // remove queue _store.removeQueue(queue); reopenStore(); verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), any(FieldTable.class), any(UUID.class)); } private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException { return createTestQueue(queueName, queueOwner, exclusive, null); } private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException { AMQQueue queue = mock(AMQQueue.class); when(queue.getName()).thenReturn(queueName); when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName)); when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner)); when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); return queue; } private Exchange createTestExchange() { Exchange exchange = mock(Exchange.class); when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName())); when(exchange.getName()).thenReturn(getName()); when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type")); when(exchange.isAutoDelete()).thenReturn(true); when(exchange.getId()).thenReturn(_exchangeId); return exchange; } private void reopenStore() throws Exception { if (_store != null) { _store.close(); } _store = createStore(); _store.configureConfigStore(_storeName, _recoveryHandler, _configuration); _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); _store.activate(); } protected MessageStore createStore() throws Exception { String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); if (storeClass == null) { storeClass = DerbyMessageStore.class.getName(); } CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance(); return messageStore; } public void testRecordXid() throws Exception { Record enqueueRecord = getTestRecord(1); Record dequeueRecord = getTestRecord(2); Record[] enqueues = { enqueueRecord }; Record[] dequeues = { dequeueRecord }; byte[] globalId = new byte[] { 1 }; byte[] branchId = new byte[] { 2 }; Transaction transaction = _store.newTransaction(); transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); transaction.commitTran(); reopenStore(); verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); transaction = _store.newTransaction(); transaction.removeXid(1l, globalId, branchId); transaction.commitTran(); reopenStore(); verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); } private Record getTestRecord(long messageNumber) { UUID queueId1 = UUIDGenerator.generateRandomUUID(); TransactionLogResource queue1 = mock(TransactionLogResource.class); when(queue1.getId()).thenReturn(queueId1); EnqueableMessage message1 = mock(EnqueableMessage.class); when(message1.isPersistent()).thenReturn(true); when(message1.getMessageNumber()).thenReturn(messageNumber); when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber)); Record enqueueRecord = new TestRecord(queue1, message1); return enqueueRecord; } private static class TestRecord implements Record { private TransactionLogResource _queue; private EnqueableMessage _message; public TestRecord(TransactionLogResource queue, EnqueableMessage message) { super(); _queue = queue; _message = message; } @Override public TransactionLogResource getQueue() { return _queue; } @Override public EnqueableMessage getMessage() { return _message; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (!(obj instanceof Record)) { return false; } Record other = (Record) obj; if (_message == null && other.getMessage() != null) { return false; } if (_queue == null && other.getQueue() != null) { return false; } if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) { return false; } return _queue.getId().equals(other.getQueue().getId()); } } }