diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-13 10:52:27 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-13 10:52:27 +0000 |
commit | e823be1ce23fc8970afc7f437eb84c164c70d837 (patch) | |
tree | e23f4c64cb756b9e770e795f1a250de02f1cfe1c | |
parent | 81b0b0fb508770fc88c8a2283b5d497c6efe90dc (diff) | |
download | qpid-python-e823be1ce23fc8970afc7f437eb84c164c70d837.tar.gz |
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631345 13f79535-47bb-0310-9956-ffa450edef68
15 files changed, 198 insertions, 54 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index 508aaf7518..b0b31d6e7b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -300,6 +300,12 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi } } + Message receiveRecoveredMessage() + { + return _replaymessages.isEmpty() ? null : _replaymessages.remove(0); + + } + Message receive0(final long timeout) { diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index 0962e4aa37..945ae1c655 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -922,7 +922,15 @@ public class SessionImpl implements Session, QueueSession, TopicSession else { consumer = _messageConsumerList.remove(0); - msg = consumer.receive0(0L); + msg = consumer.receiveRecoveredMessage(); + if(msg == null) + { + msg = consumer.receive0(0L); + } + else + { + recoveredMessage = true; + } } MessageListener listener = consumer._messageListener; diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 4602c67fba..87de4eae4e 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -107,7 +107,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { new Upgrader(getEnvironmentFacade().getEnvironment(), getParent()).upgradeIfNecessary(); } - catch(DatabaseException e) + catch(RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot upgrade store", e); } @@ -138,7 +138,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore MESSAGE_METADATA_SEQ_CONFIG); newMessageId = mmdSeq.get(null, 1); } - catch (DatabaseException de) + catch (RuntimeException de) { throw getEnvironmentFacade().handleDatabaseException("Cannot get sequence value for new message", de); } @@ -216,7 +216,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); } @@ -259,7 +259,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore entries.add(entry); } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); } @@ -306,7 +306,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); } @@ -350,7 +350,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore return mdd; } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error reading message metadata for message with id " + messageId @@ -424,7 +424,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore tx.abort(); } } - catch(DatabaseException e2) + catch(RuntimeException e2) { getLogger().warn( "Unable to abort transaction after LockConflictException on removal of message with id " @@ -465,7 +465,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } while(!complete); } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Unexpected BDB exception", e); @@ -550,7 +550,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } return written; } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + messageId @@ -587,7 +587,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + messageId @@ -618,7 +618,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } } - catch (DatabaseException e) + catch (RuntimeException e) { throw environmentFacade.handleDatabaseException("Cannot visit messages", e); } @@ -630,7 +630,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { cursor.close(); } - catch(DatabaseException e) + catch(RuntimeException e) { throw environmentFacade.handleDatabaseException("Cannot close cursor", e); } @@ -659,7 +659,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - catch (DatabaseException e) + catch (RuntimeException e) { throw environmentFacade.handleDatabaseException("Cannot visit messages", e); } @@ -697,7 +697,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error writing AMQMessage with id " + messageId @@ -740,7 +740,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore getLogger().debug("Storing message metadata for message id " + messageId + " in transaction " + tx); } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error writing message metadata with id " + messageId @@ -779,7 +779,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } getDeliveryDb().put(tx, key, value); } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to enqueue: " + e.getMessage(), e); throw getEnvironmentFacade().handleDatabaseException("Error writing enqueued message with id " @@ -838,7 +838,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to dequeue message " + messageId + " in transaction " + tx, e); @@ -879,7 +879,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore getXidDb().put(txn, key, value); return postActions; } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to write xid: " + e.getMessage(), e); throw getEnvironmentFacade().handleDatabaseException("Error writing xid to database", e); @@ -910,7 +910,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to remove xid in transaction " + txn, e); @@ -963,7 +963,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { tx.abort(); } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); } @@ -975,7 +975,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { storedSizeChange(delta); } - catch(DatabaseException e) + catch(RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Stored size change exception", e); } @@ -1415,7 +1415,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore txn = getEnvironmentFacade().getEnvironment().beginTransaction( null, null); } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e); } @@ -1476,7 +1476,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { _txn = getEnvironmentFacade().getEnvironment().beginTransaction(null, null); } - catch(DatabaseException e) + catch(RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot create store transaction", e); } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java index 59bc5aad88..5550381c9c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -140,7 +140,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } _initialRecords = new ConfiguredObjectRecord[0]; } - catch(DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e); } @@ -156,7 +156,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi doVisitAllConfiguredObjectRecords(handler); handler.end(); } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); } @@ -243,7 +243,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi _environmentFacade.close(); _environmentFacade = null; } - catch(DatabaseException e) + catch (RuntimeException e) { throw new StoreException("Exception occurred on message store close", e); } @@ -268,7 +268,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi txn.commit(); txn = null; } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject + " in database: " + e.getMessage(), e); @@ -305,7 +305,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi txn = null; return removed.toArray(new UUID[removed.size()]); } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e); } @@ -334,7 +334,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi txn.commit(); txn = null; } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); } @@ -408,7 +408,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi } writeHierarchyRecords(txn, configuredObject); } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject + " to database: " + e.getMessage(), e); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java index 8504971a7a..2d8b113310 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java @@ -38,7 +38,7 @@ public class BDBUtils { cursor.close(); } - catch(DatabaseException e) + catch (RuntimeException e) { // We need the possible side effect of the facade restarting the environment but don't care about the exception throw environmentFacade.handleDatabaseException("Cannot close cursor", e); @@ -55,7 +55,7 @@ public class BDBUtils tx.abort(); } } - catch (DatabaseException e) + catch (RuntimeException e) { // We need the possible side effect of the facade restarting the environment but don't care about the exception environmentFacade.handleDatabaseException("Cannot abort transaction", e); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index bd078bb0f7..a42bc43a5e 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -57,7 +57,7 @@ public interface EnvironmentFacade StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync); - DatabaseException handleDatabaseException(String contextMessage, DatabaseException e); + RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); String getStoreLocation(); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index a3b3ec7324..685731b5f0 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -37,6 +37,7 @@ import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; public class StandardEnvironmentFacade implements EnvironmentFacade @@ -266,13 +267,17 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public DatabaseException handleDatabaseException(String contextMessage, DatabaseException e) + public RuntimeException handleDatabaseException(String contextMessage, RuntimeException e) { if (_environment != null && !_environment.isValid()) { closeEnvironmentSafely(); } - return e; + if (e instanceof StoreException) + { + return e; + } + return new StoreException("Unexpected exception occurred on store operation", e); } @Override diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index 0dff8417ca..03b1425354 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -79,7 +79,7 @@ public class DatabasePinger } } } - catch (DatabaseException de) + catch (RuntimeException de) { facade.handleDatabaseException("DatabaseException from DatabasePinger ", de); } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 532cffdac8..8877f6a156 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -82,6 +82,7 @@ import com.sleepycat.je.rep.vlsn.VLSNRange; import com.sleepycat.je.utilint.PropUtil; import com.sleepycat.je.utilint.VLSN; import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreException; import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -371,26 +372,40 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe) + public RuntimeException handleDatabaseException(String contextMessage, final RuntimeException dbe) { - boolean noMajority = dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException; - - if (noMajority) + if (dbe instanceof StoreException || dbe instanceof ConnectionScopedRuntimeException) { - ReplicationGroupListener listener = _replicationGroupListener.get(); - if (listener != null) + return dbe; + } + else if (dbe instanceof DatabaseException) + { + boolean noMajority = dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException; + + if (noMajority) + { + ReplicationGroupListener listener = _replicationGroupListener.get(); + if (listener != null) + { + listener.onNoMajority(); + } + } + + boolean restart = (noMajority || dbe instanceof RestartRequiredException); + if (restart) { - listener.onNoMajority(); + tryToRestartEnvironment((DatabaseException)dbe); + return new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe); } } - - boolean restart = (noMajority || dbe instanceof RestartRequiredException); - if (restart) + else { - tryToRestartEnvironment(dbe); - throw new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe); + if (dbe instanceof IllegalStateException && getFacadeState() == State.RESTARTING) + { + return new ConnectionScopedRuntimeException("Underlying JE environment is being restarted", dbe); + } } - return dbe; + return new StoreException("Unexpected exception occurred in replicated environment", dbe); } private void tryToRestartEnvironment(final DatabaseException dbe) @@ -452,12 +467,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } if (_state.get() != State.OPEN) { - throw new IllegalStateException("Environment facade is not in opened state"); + throw new ConnectionScopedRuntimeException("Environment facade is not in opened state"); } if (!_environment.isValid()) { - throw new IllegalStateException("Environment is not valid"); + throw new ConnectionScopedRuntimeException("Environment is not valid"); } Database cachedHandle = _cachedDatabases.get(name); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 0003e74dbc..db2e1277c7 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -343,6 +343,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu String nodeAddress = node.getHostName() + ":" + node.getPort(); if (!_permittedNodes.contains(nodeAddress)) { + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress)); shutdownOnIntruder(nodeAddress); throw new IllegalStateException("Intruder node detected: " + nodeAddress); } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 0c8a63eb5b..fb0c11f6e5 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -283,7 +283,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase catch (RuntimeException e) { assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getMessage()); + + messageid_0_8 + "!", e.getCause().getMessage()); } // buffer is smaller then message size diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 69d44bbe88..bef5a87217 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.test.utils.PortHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -146,6 +147,24 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertNotSame("Expecting a new handle after database closure", handle1, handle3); } + public void testOpenDatabaseWhenFacadeIsNotOpened() throws Exception + { + DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); + + EnvironmentFacade ef = createMaster(); + ef.close(); + + try + { + ef.openDatabase("myDatabase", createIfAbsentDbConfig ); + fail("Database open should fail"); + } + catch(ConnectionScopedRuntimeException e) + { + assertEquals("Unexpected exception", "Environment facade is not in opened state", e.getMessage()); + } + } + public void testGetGroupName() throws Exception { assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java index 4156fc8157..064c9788b2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java @@ -50,6 +50,8 @@ public class BrokerMessages public static final String STOPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stopped"; public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs"; public static final String LISTENING_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening"; + public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive"; + public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active"; public static final String MAX_MEMORY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.max_memory"; public static final String PLATFORM_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform"; public static final String SHUTTING_DOWN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.shutting_down"; @@ -66,6 +68,8 @@ public class BrokerMessages Logger.getLogger(STOPPED_LOG_HIERARCHY); Logger.getLogger(STATS_MSGS_LOG_HIERARCHY); Logger.getLogger(LISTENING_LOG_HIERARCHY); + Logger.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY); + Logger.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY); Logger.getLogger(MAX_MEMORY_LOG_HIERARCHY); Logger.getLogger(PLATFORM_LOG_HIERARCHY); Logger.getLogger(SHUTTING_DOWN_LOG_HIERARCHY); @@ -265,6 +269,70 @@ public class BrokerMessages /** * Log a Broker message of the Format: + * <pre>BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2) + { + String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE"); + + final Object[] messageArguments = {param1, param2}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Broker message of the Format: + * <pre>BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2) + { + String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE"); + + final Object[] messageArguments = {param1, param2}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Broker message of the Format: * <pre>BRK-1011 : Maximum Memory : {0,number} bytes</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties index 76c1fa1b5b..d764145ec8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties @@ -47,4 +47,9 @@ PLATFORM = BRK-1010 : Platform : JVM : {0} version: {1} OS : {2} version: {3} ar # 0 Maximum Memory MAX_MEMORY = BRK-1011 : Maximum Memory : {0,number} bytes -MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1}
\ No newline at end of file +MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1} + +# 0 - Total message size +# 1 - Target memory size +FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB +FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB
\ No newline at end of file diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 109aaff5bd..ee008b1642 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -82,6 +82,9 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple private Timer _reportingTimer; private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + /** Flags used to control the reporting of flow to disk. Protected by this */ + private boolean _totalMessageSizeExceedThresholdReported = false, _totalMessageSizeWithinThresholdReported = true; + @ManagedAttributeField private String _defaultVirtualHost; @ManagedAttributeField @@ -99,6 +102,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @ManagedAttributeField private String _confidentialConfigurationEncryptionProvider; + @ManagedObjectFactoryConstructor public BrokerAdapter(Map<String, Object> attributes, SystemConfig parent) @@ -437,6 +441,19 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } } + if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported) + { + _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024)); + _totalMessageSizeExceedThresholdReported = true; + _totalMessageSizeWithinThresholdReported = false; + } + else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported) + { + _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024)); + _totalMessageSizeWithinThresholdReported = true; + _totalMessageSizeExceedThresholdReported = false; + } + for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet()) { |