diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:15:31 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:15:31 +0000 |
commit | e0e8f4c5087c1c5dc787740d6bd862755bd8daf1 (patch) | |
tree | da456e90c9b57ba69f255e564c2945aaf2c7617b | |
parent | 78a00e2a3a1bbc7486de0fad72603617958062c3 (diff) | |
download | qpid-python-e0e8f4c5087c1c5dc787740d6bd862755bd8daf1.tar.gz |
Merging from trunk r1617822:1618206 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620339 13f79535-47bb-0310-9956-ffa450edef68
45 files changed, 639 insertions, 1343 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index 1a72e129e7..508aaf7518 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -173,7 +173,10 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi } else { - throw new JMSException(e.getMessage(), error.getCondition().getValue().toString()); + JMSException jmsException = + new JMSException(e.getMessage(), error.getCondition().getValue().toString()); + jmsException.initCause(e); + throw jmsException; } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java index 302060776a..82f29ea4b1 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java @@ -34,7 +34,7 @@ public class ConnectionErrorException extends ConnectionException public ConnectionErrorException(Error remoteError) { - super(remoteError.getDescription()); + super(remoteError.getDescription() == null ? remoteError.toString() : remoteError.getDescription()); _remoteError = remoteError; } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index a2a15779d2..826a757850 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -194,9 +194,14 @@ public class Receiver implements DeliveryStateHandler } catch (InterruptedException e) { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted whil waiting for detach following failed attach"); + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for detach following failed attach"); } - throw new ConnectionErrorException(getError()); + throw new ConnectionErrorException(getError().getCondition(), + getError().getDescription() == null + ? "AMQP error: '" + getError().getCondition().toString() + + "' when attempting to create a receiver" + + (source != null ? " from: '" + source.getAddress() +"'" : "") + : getError().getDescription()); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/VirtualHostNodeLogSubject.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/BDBHAVirtualHostNodeLogSubject.java index fad9a91841..a209062993 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/VirtualHostNodeLogSubject.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/BDBHAVirtualHostNodeLogSubject.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.server.logging.subjects; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.VIRTUAL_HOST_NODE_FORMAT; -import org.apache.qpid.server.model.VirtualHostNode; - -public class VirtualHostNodeLogSubject extends AbstractLogSubject +public class BDBHAVirtualHostNodeLogSubject extends AbstractLogSubject { - public VirtualHostNodeLogSubject(String nodeName) + public static final String VIRTUAL_HOST_NODE_FORMAT = "grp(/{0})/vhn(/{1})"; + + public BDBHAVirtualHostNodeLogSubject(String groupName, String nodeName) { - setLogStringWithFormat(VIRTUAL_HOST_NODE_FORMAT, nodeName); + setLogStringWithFormat(VIRTUAL_HOST_NODE_FORMAT, groupName, nodeName); } } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/GroupLogSubject.java index ec67fc68b3..51fd1fc2dc 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/GroupLogSubject.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,40 +18,15 @@ * under the License. * */ -package org.apache.qpid.server.util; - -public class RunStats -{ - private long min = Long.MAX_VALUE; - private long max; - private long total; - private int count; +package org.apache.qpid.server.logging.subjects; - public void record(long time) - { - max = Math.max(time, max); - min = Math.min(time, min); - total += time; - count++; - } - public long getMin() - { - return min; - } - - public long getMax() - { - return max; - } - - public long getAverage() - { - return total / count; - } +public class GroupLogSubject extends AbstractLogSubject +{ + public static final String GROUP_FORMAT = "grp(/{0})"; - public String toString() + public GroupLogSubject(String groupName) { - return "avg=" + getAverage() + ", min=" + min + ", max=" + max; + setLogStringWithFormat(GROUP_FORMAT, groupName); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 835846a5ec..78cddc708e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1333,7 +1333,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore data = new byte[0]; } } - return ByteBuffer.wrap(data,offsetInMessage,size); + return ByteBuffer.wrap(data,offsetInMessage,Math.min(size,data.length-offsetInMessage)); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index 5263f5942f..06671998ec 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -30,9 +30,14 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.je.rep.MasterStateException; +import com.sleepycat.je.rep.ReplicatedEnvironment; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; +import org.apache.qpid.server.logging.subjects.BDBHAVirtualHostNodeLogSubject; +import org.apache.qpid.server.logging.subjects.GroupLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -40,6 +45,8 @@ import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.SystemConfig; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; @@ -53,13 +60,16 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB private volatile long _joinTime; private volatile long _lastTransactionId; + private volatile String _lastReplicatedEnvironmentState = ReplicatedEnvironment.State.UNKNOWN.name(); @ManagedAttributeField(afterSet="afterSetRole") - private volatile String _role; + private volatile String _role = ReplicatedEnvironment.State.UNKNOWN.name(); private final AtomicReference<State> _state; private final boolean _isMonitor; private boolean _detached; + private BDBHAVirtualHostNodeLogSubject _virtualHostNodeLogSubject; + private GroupLogSubject _groupLogSubject; public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode<?> virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) { @@ -92,7 +102,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB @Override public String getRole() { - return _role; + return _lastReplicatedEnvironmentState; } @Override @@ -152,10 +162,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB { String nodeName = getName(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting node '" + nodeName + "' from group '" + getGroupName() + "'"); - } + getEventLogger().message(_virtualHostNodeLogSubject, HighAvailabilityMessages.DELETED()); try { @@ -178,19 +185,11 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB try { String nodeName = getName(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Trying to transfer master to '" + nodeName + "'"); - } + getEventLogger().message(_groupLogSubject, HighAvailabilityMessages.TRANSFER_MASTER(getName(), getAddress())); _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("The transfer of mastership to node '" + nodeName + "' has been initiated."); - } } - catch(Exception e) + catch (Exception e) { throw new IllegalConfigurationException("Cannot transfer mastership to '" + getName() + "'", e); } @@ -227,10 +226,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB void setRole(String role) { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug(this + " updating role to : " + role); - } + _lastReplicatedEnvironmentState = role; _role = role; updateModelStateFromRole(role); } @@ -266,4 +262,17 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB { this._detached = detached; } + + @Override + public void onValidate() + { + super.onValidate(); + _virtualHostNodeLogSubject = new BDBHAVirtualHostNodeLogSubject(getGroupName(), getName()); + _groupLogSubject = new GroupLogSubject(getGroupName()); + } + + private EventLogger getEventLogger() + { + return ((SystemConfig)getParent(VirtualHostNode.class).getParent(Broker.class).getParent(SystemConfig.class)).getEventLogger(); + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 5489493f74..fd098a8ef6 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.text.MessageFormat; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -49,6 +50,8 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; +import org.apache.qpid.server.logging.subjects.BDBHAVirtualHostNodeLogSubject; +import org.apache.qpid.server.logging.subjects.GroupLogSubject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; @@ -75,6 +78,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl> { public static final String VIRTUAL_HOST_NODE_TYPE = "BDB_HA"; + public static final String VIRTUAL_HOST_PRINCIPAL_NAME_FORMAT = "grp(/{0})/vhn(/{1})"; /** * Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a @@ -87,6 +91,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>(); private final AtomicReference<ReplicatedEnvironment.State> _lastReplicatedEnvironmentState = new AtomicReference<>(ReplicatedEnvironment.State.UNKNOWN); + private BDBHAVirtualHostNodeLogSubject _virtualHostNodeLogSubject; + private GroupLogSubject _groupLogSubject; + private String _virtualHostNodePrincipalName; @ManagedAttributeField private String _storePath; @@ -267,7 +274,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ADDED(getName(), getGroupName())); + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.CREATED()); } protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() @@ -291,8 +298,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu getConfigurationStore().openConfigurationStore(this, false); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ATTACHED(getName(), getGroupName(), getRole())); - getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED()); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath())); @@ -325,23 +330,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu ReplicatedEnvironment.State detached = ReplicatedEnvironment.State.DETACHED; _lastReplicatedEnvironmentState.set(detached); attributeSet(ROLE, _role, detached); - - //Perhaps, having STOPPED operational logging could be sufficient. However, on START we still will be seeing 2 logs: ATTACHED and STARTED - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName())); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.STOPPED(getName(), getGroupName())); - } - } - - @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() - { - try - { - super.doActivate(); - } - finally - { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.STARTED(getName(), getGroupName())); } } @@ -359,7 +347,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); super.doDelete(); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED(getName(), getGroupName())); + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); if (getState() == State.DELETED && !helpers.isEmpty()) { try @@ -413,10 +401,18 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu finally { closeEnvironment(); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName())); } } + @Override + public void onValidate() + { + super.onValidate(); + _virtualHostNodeLogSubject = new BDBHAVirtualHostNodeLogSubject(getGroupName(), getName()); + _groupLogSubject = new GroupLogSubject(getGroupName()); + _virtualHostNodePrincipalName = MessageFormat.format(VIRTUAL_HOST_PRINCIPAL_NAME_FORMAT, getGroupName(), getName()); + } + private void onMaster() { try @@ -550,7 +546,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { LOGGER.info("Received BDB event indicating transition to state " + state); } - + String previousRole = getRole(); try { switch (state) @@ -573,10 +569,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu finally { _lastReplicatedEnvironmentState.set(state); - String previousRole = _role; attributeSet(ROLE, _role, state.name()); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ROLE_CHANGED(getName(), - getGroupName(), previousRole, state.name())); + getEventLogger().message(getGroupLogSubject(), + HighAvailabilityMessages.ROLE_CHANGED(getName(), getAddress(), previousRole, state.name())); } } } @@ -591,8 +586,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.PRIORITY_CHANGED(getName(), - getGroupName(), String.valueOf(_priority))); + getEventLogger().message(getVirtualHostNodeLogSubject(), + HighAvailabilityMessages.PRIORITY_CHANGED(String.valueOf(_priority))); } catch (TimeoutException e) { @@ -619,8 +614,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED(getName(), - getGroupName(), String.valueOf(_designatedPrimary))); + getEventLogger().message(getVirtualHostNodeLogSubject(), + HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED(String.valueOf(_designatedPrimary))); } catch (TimeoutException e) { @@ -647,8 +642,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED(getName(), - getGroupName(), String.valueOf(_quorumOverride))); + getEventLogger().message(getVirtualHostNodeLogSubject(), + HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED(String.valueOf(_quorumOverride))); } catch (TimeoutException e) { @@ -674,8 +669,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.TRANSFER_MASTER(getName(), getAddress())); environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.TRANSFER_MASTER(getName(), getName(), getGroupName())); } catch (TimeoutException e) { @@ -701,18 +696,27 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return getAddress().equals(getHelperAddress()); } + BDBHAVirtualHostNodeLogSubject getVirtualHostNodeLogSubject() + { + return _virtualHostNodeLogSubject; + } + + GroupLogSubject getGroupLogSubject() + { + return _groupLogSubject; + } + private class RemoteNodesDiscoverer implements ReplicationGroupListener { @Override public void onReplicationNodeAddedToGroup(final ReplicationNode node) { - getTaskExecutor().submit(new Task<Void>() + getTaskExecutor().submit(new VirtualHostNodeGroupTask() { @Override - public Void execute() + public void perform() { addRemoteReplicationNode(node); - return null; } }); } @@ -722,19 +726,18 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); remoteNode.create(); childAdded(remoteNode); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ADDED(remoteNode.getName(), getGroupName())); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.ADDED(remoteNode.getName(), remoteNode.getAddress())); } @Override public void onReplicationNodeRecovered(final ReplicationNode node) { - getTaskExecutor().submit(new Task<Void>() + getTaskExecutor().submit(new VirtualHostNodeGroupTask() { @Override - public Void execute() + public void perform() { recoverRemoteReplicationNode(node); - return null; } }); } @@ -745,19 +748,18 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu remoteNode.registerWithParents(); remoteNode.open(); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ATTACHED(remoteNode.getName(), getGroupName(), String.valueOf(remoteNode.getState()))); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.JOINED(remoteNode.getName(), remoteNode.getAddress())); } @Override public void onReplicationNodeRemovedFromGroup(final ReplicationNode node) { - getTaskExecutor().submit(new Task<Void>() + getTaskExecutor().submit(new VirtualHostNodeGroupTask() { @Override - public Void execute() + public void perform() { removeRemoteReplicationNode(node); - return null; } }); } @@ -768,12 +770,25 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu if (remoteNode != null) { remoteNode.deleted(); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED(remoteNode.getName(), getGroupName())); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.REMOVED(remoteNode.getName(), remoteNode.getAddress())); } } @Override - public void onNodeState(ReplicationNode node, NodeState nodeState) + public void onNodeState(final ReplicationNode node, final NodeState nodeState) + { + Subject.doAs(SecurityManager.getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + processNodeState(node, nodeState); + return null; + } + }); + } + + private void processNodeState(ReplicationNode node, NodeState nodeState) { BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName()); if (remoteNode != null) @@ -785,7 +800,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu remoteNode.setLastTransactionId(-1); if (!remoteNode.isDetached()) { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(remoteNode.getName(), getGroupName())); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.LEFT(remoteNode.getName(), remoteNode.getAddress())); remoteNode.setDetached(true); } } @@ -796,7 +811,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu remoteNode.setRole(nodeState.getNodeState().name()); if (remoteNode.isDetached()) { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ATTACHED(remoteNode.getName(), getGroupName(), remoteNode.getRole() )); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.JOINED(remoteNode.getName(), remoteNode.getAddress())); remoteNode.setDetached(false); } } @@ -804,17 +819,29 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu String newRole = remoteNode.getRole(); if (!newRole.equals(currentRole)) { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ROLE_CHANGED(remoteNode.getName(), - getGroupName(), currentRole, newRole)); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.ROLE_CHANGED(remoteNode.getName(), remoteNode.getAddress(), currentRole, newRole)); } } } @Override - public void onIntruderNode(ReplicationNode node) + public void onIntruderNode(final ReplicationNode node) + { + Subject.doAs(SecurityManager.getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + processIntruderNode(node); + return null; + } + }); + } + + private void processIntruderNode(ReplicationNode node) { String hostAndPort = node.getHostName() + ":" + node.getPort(); - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), hostAndPort, getGroupName())); + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), hostAndPort)); boolean inManagementMode = getParent(Broker.class).isManagementMode(); if (inManagementMode) @@ -858,7 +885,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public void onNoMajority() { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.MAJORITY_LOST(getName(), getGroupName())); + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.QUORUM_LOST()); } private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode) @@ -872,4 +899,23 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } + private abstract class VirtualHostNodeGroupTask implements Task<Void> + { + @Override + public Void execute() + { + return Subject.doAs(SecurityManager.getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + perform(); + return null; + } + }); + } + + abstract void perform(); + } + } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java index 6259b49d61..0d64d87aef 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java @@ -19,6 +19,7 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -33,6 +34,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -69,7 +71,7 @@ public class BDBHARemoteReplicationNodeTest extends QpidTestCase // Virtualhost needs the EventLogger from the SystemContext. when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker); - + doReturn(VirtualHostNode.class).when(_virtualHostNode).getCategoryClass(); ConfiguredObjectFactory objectFactory = _broker.getObjectFactory(); when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel()); when(_virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor); @@ -80,7 +82,7 @@ public class BDBHARemoteReplicationNodeTest extends QpidTestCase String remoteReplicationName = getName(); BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName); - remoteReplicationNode.setAttribute(BDBHARemoteReplicationNode.ROLE, null, "MASTER"); + remoteReplicationNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "UNKNOWN", "MASTER"); verify(_facade).transferMasterAsynchronously(remoteReplicationName); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java index ef1021160c..ea7d74090d 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java @@ -78,67 +78,18 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase _helper.assertNodeRole(node1, "MASTER"); - String expectedMessage = HighAvailabilityMessages.ADDED(node1.getName(), node1.getGroupName()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY))); - - expectedMessage = HighAvailabilityMessages.ATTACHED(node1.getName(), node1.getGroupName(), "UNKNOWN").toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ATTACHED_LOG_HIERARCHY))); + assertEquals("Unexpected VHN log subject", "[grp(/group)/vhn(/node1)] ", node1.getVirtualHostNodeLogSubject().getLogString()); + assertEquals("Unexpected group log subject", "[grp(/group)] ", node1.getGroupLogSubject().getLogString()); - - expectedMessage = HighAvailabilityMessages.STARTED(node1.getName(), node1.getGroupName()).toString(); + String expectedMessage = HighAvailabilityMessages.CREATED().toString(); verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.STARTED_LOG_HIERARCHY))); + argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.CREATED_LOG_HIERARCHY))); - expectedMessage = HighAvailabilityMessages.ROLE_CHANGED(node1.getName(), node1.getGroupName(), "UNKNOWN", "MASTER").toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), + expectedMessage = HighAvailabilityMessages.ROLE_CHANGED(node1.getName(), node1.getAddress(), "UNKNOWN", "MASTER").toString(); + verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ROLE_CHANGED_LOG_HIERARCHY))); } - public void testStop() throws Exception - { - int node1PortNumber = findFreePort(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, "MASTER"); - reset(_eventLogger); - - node1.stop(); - - String expectedMessage = HighAvailabilityMessages.DETACHED(node1.getName(), node1.getGroupName()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY))); - - expectedMessage = HighAvailabilityMessages.STOPPED(node1.getName(), node1.getGroupName()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.STOPPED_LOG_HIERARCHY))); - } - - public void testClose() throws Exception - { - int node1PortNumber = findFreePort(); - String helperAddress = "localhost:" + node1PortNumber; - String groupName = "group"; - String nodeName = "node1"; - - Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); - BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes); - _helper.assertNodeRole(node1, "MASTER"); - - reset(_eventLogger); - - node1.close(); - - String expectedMessage = HighAvailabilityMessages.DETACHED(node1.getName(), node1.getGroupName()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY))); - } - public void testDelete() throws Exception { int node1PortNumber = findFreePort(); @@ -154,13 +105,10 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase node1.delete(); - String expectedMessage = HighAvailabilityMessages.DETACHED(node1.getName(), node1.getGroupName()).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY))); - - expectedMessage = HighAvailabilityMessages.DELETED(node1.getName(), node1.getGroupName()).toString(); + String expectedMessage = HighAvailabilityMessages.DELETED().toString(); verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DELETED_LOG_HIERARCHY))); + } public void testSetPriority() throws Exception @@ -181,7 +129,7 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase // make sure that task executor thread finishes all scheduled tasks node1.stop(); - String expectedMessage = HighAvailabilityMessages.PRIORITY_CHANGED(node1.getName(), node1.getGroupName(), "10").toString(); + String expectedMessage = HighAvailabilityMessages.PRIORITY_CHANGED("10").toString(); verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.PRIORITY_CHANGED_LOG_HIERARCHY))); } @@ -204,7 +152,7 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase // make sure that task executor thread finishes all scheduled tasks node1.stop(); - String expectedMessage = HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED(node1.getName(), node1.getGroupName(), "1").toString(); + String expectedMessage = HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED("1").toString(); verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY))); } @@ -227,7 +175,7 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase // make sure that task executor thread finishes all scheduled tasks node1.stop(); - String expectedMessage = HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED(node1.getName(), node1.getGroupName(), "true").toString(); + String expectedMessage = HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED("true").toString(); verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY))); } @@ -254,14 +202,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase // make sure that task executor thread finishes all scheduled tasks node2.stop(); - // Verify ADDED message from node2 when its created - String expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), groupName).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node2.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY))); - // Verify ADDED message from node1 when it discovers node2 has been added - expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), groupName).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), + String expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), node2.getAddress()).toString(); + verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY))); } @@ -292,9 +235,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase // make sure that task executor thread finishes all scheduled tasks node1.stop(); - String expectedMessage = HighAvailabilityMessages.DELETED(node2.getName(), groupName).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DELETED_LOG_HIERARCHY))); + String expectedMessage = HighAvailabilityMessages.REMOVED(node2.getName(), node2.getAddress()).toString(); + verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), + argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.REMOVED_LOG_HIERARCHY))); } public void testRemoteNodeDetached() throws Exception @@ -324,9 +267,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase waitForNodeDetachedField(remoteNode, true); // verify that remaining node issues the DETACHED operational logging for remote node - String expectedMessage = HighAvailabilityMessages.DETACHED(node2.getName(), groupName).toString(); - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY))); + String expectedMessage = HighAvailabilityMessages.LEFT(node2.getName(), node2.getAddress()).toString(); + verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), + argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.LEFT_LOG_HIERARCHY))); } @@ -361,48 +304,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase _helper.assertNodeRole(node2, "REPLICA", "MASTER"); waitForNodeDetachedField(remoteNode, false); - final String expectedMessage = HighAvailabilityMessages.ATTACHED(node2.getName(), groupName, "REPLICA").toString(); - final String expectedMessage2 = HighAvailabilityMessages.ATTACHED(node2.getName(), groupName, "UNKNOWN").toString(); - final String expectedMessage3 = HighAvailabilityMessages.ATTACHED(node2.getName(), groupName, "MASTER").toString(); - ArgumentMatcher<LogMessage> matcher = new ArgumentMatcher<LogMessage>() - { - private String _messageErrorDescription = null; - private String _hierarchyErrorDescription = null; - - @Override - public boolean matches(Object argument) - { - LogMessage logMessage = (LogMessage)argument; - String actualMessage = logMessage.toString(); - boolean expectedMessageMatches = expectedMessage.equals(actualMessage) - || expectedMessage2.equals(actualMessage) || expectedMessage3.equals(actualMessage); - if (!expectedMessageMatches) - { - _messageErrorDescription = "Actual message does not match any expected: " + actualMessage; - } - boolean expectedHierarchyMatches = HighAvailabilityMessages.ATTACHED_LOG_HIERARCHY.equals(logMessage.getLogHierarchy()); - if (!expectedHierarchyMatches) - { - _hierarchyErrorDescription = "Actual hierarchy does not match expected: " + logMessage.getLogHierarchy(); - } - return expectedMessageMatches && expectedHierarchyMatches; - } - - @Override - public void describeTo(Description description) - { - if (_messageErrorDescription != null) - { - description.appendText(_messageErrorDescription); - } - if (_hierarchyErrorDescription != null) - { - description.appendText(_hierarchyErrorDescription); - } - } - }; - verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())), - argThat(matcher)); + final String expectedMessage = HighAvailabilityMessages.JOINED(node2.getName(), node2.getAddress()).toString(); + verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())), + argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.JOINED_LOG_HIERARCHY))); } private void waitForNodeDetachedField(BDBHARemoteReplicationNodeImpl remoteNode, boolean expectedDetached) throws InterruptedException { diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java index e78ef34759..f7dce4f3f5 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java @@ -75,9 +75,9 @@ public class GroupCreator private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final int FAILOVER_CYCLECOUNT = 10; - private static final int FAILOVER_RETRIES = 1; - private static final int FAILOVER_CONNECTDELAY = 1000; + private static final int FAILOVER_CYCLECOUNT = 20; + private static final int FAILOVER_RETRIES = 0; + private static final int FAILOVER_CONNECTDELAY = 500; private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java index c6f005c0e7..63de287be7 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java @@ -206,20 +206,6 @@ public class JMXManagementTest extends QpidBrokerTestCase } } - public void testSetDesignatedPrimary() throws Exception - { - int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort); - assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary()); - storeBean.setDesignatedPrimary(true); - long limit = System.currentTimeMillis() + 5000; - while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) - { - Thread.sleep(100l); - } - assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); - } - public void testVirtualHostMbeanOnMasterTransfer() throws Exception { Connection connection = getConnection(_brokerFailoverUrl); diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java index 0f8a1609de..248dbb4def 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java @@ -20,27 +20,29 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.io.File; +import java.util.Collections; +import java.util.Map; import javax.jms.Connection; import javax.jms.JMSException; -import javax.management.ObjectName; import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.test.utils.QpidBrokerTestCase; public class TwoNodeTest extends QpidBrokerTestCase { private static final String VIRTUAL_HOST = "test"; - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); private static final int NUMBER_OF_NODES = 2; private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - private ConnectionURL _brokerFailoverUrl; + /** Used when expectation is client will not (re)-connect */ + private ConnectionURL _positiveFailoverUrl; + + /** Used when expectation is client will not (re)-connect */ + private ConnectionURL _negativeFailoverUrl; @Override protected void setUp() throws Exception @@ -54,19 +56,6 @@ public class TwoNodeTest extends QpidBrokerTestCase } @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override public void startBroker() throws Exception { // Don't start default broker provided by QBTC. @@ -77,20 +66,21 @@ public class TwoNodeTest extends QpidBrokerTestCase setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); _groupCreator.configureClusterNodes(); _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); - _brokerFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(); + _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(); + _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2); _groupCreator.startCluster(); } public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception { startCluster(true); - final Connection initialConnection = getConnection(_brokerFailoverUrl); + final Connection initialConnection = getConnection(_positiveFailoverUrl); int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection); assertProducingConsuming(initialConnection); initialConnection.close(); _groupCreator.stopCluster(); _groupCreator.startNode(masterPort); - final Connection secondConnection = getConnection(_brokerFailoverUrl); + final Connection secondConnection = getConnection(_positiveFailoverUrl); assertProducingConsuming(secondConnection); secondConnection.close(); } @@ -98,12 +88,12 @@ public class TwoNodeTest extends QpidBrokerTestCase public void testClusterRestartWithoutDesignatedPrimary() throws Exception { startCluster(false); - final Connection initialConnection = getConnection(_brokerFailoverUrl); + final Connection initialConnection = getConnection(_positiveFailoverUrl); assertProducingConsuming(initialConnection); initialConnection.close(); _groupCreator.stopCluster(); _groupCreator.startClusterParallel(); - final Connection secondConnection = getConnection(_brokerFailoverUrl); + final Connection secondConnection = getConnection(_positiveFailoverUrl); assertProducingConsuming(secondConnection); secondConnection.close(); } @@ -112,7 +102,7 @@ public class TwoNodeTest extends QpidBrokerTestCase { startCluster(true); _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); - final Connection connection = getConnection(_brokerFailoverUrl); + final Connection connection = getConnection(_positiveFailoverUrl); assertNotNull("Expected to get a valid connection to primary", connection); assertProducingConsuming(connection); } @@ -124,7 +114,7 @@ public class TwoNodeTest extends QpidBrokerTestCase try { - Connection connection = getConnection(_brokerFailoverUrl); + Connection connection = getConnection(_negativeFailoverUrl); assertProducingConsuming(connection); fail("Exception not thrown"); } @@ -143,7 +133,7 @@ public class TwoNodeTest extends QpidBrokerTestCase try { - getConnection(_brokerFailoverUrl); + getConnection(_negativeFailoverUrl); fail("Connection not expected"); } catch (JMSException e) @@ -155,41 +145,39 @@ public class TwoNodeTest extends QpidBrokerTestCase public void testInitialDesignatedPrimaryStateOfNodes() throws Exception { startCluster(true); - final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfPrimary()); - assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); - final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode()); - assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + Map<String, Object> primaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", + (Boolean) primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); + + Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", + (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); } public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception { startCluster(true); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); - assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); - storeBean.setDesignatedPrimary(true); + Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); + + _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true)); - long limit = System.currentTimeMillis() + 5000; - while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + int timeout = 5000; + long limit = System.currentTimeMillis() + timeout; + while( !((Boolean)secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)) && System.currentTimeMillis() < limit) { Thread.sleep(100); + secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode()); } - assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + assertTrue("Expected secondary to transition to primary within " + timeout, (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); - final Connection connection = getConnection(_brokerFailoverUrl); + final Connection connection = getConnection(_positiveFailoverUrl); assertNotNull("Expected to get a valid connection to new primary", connection); assertProducingConsuming(connection); } - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( - final int activeBrokerPortNumber) throws Exception - { - _jmxUtils.open(activeBrokerPortNumber); - - ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - return storeBean; - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index fa4e3f21dd..597fc44e4c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -281,4 +281,10 @@ class HeadersBinding return true; } + + @Override + public int hashCode() + { + return _binding == null ? 0 : _binding.hashCode(); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java index 9e497efcd2..b864a8c095 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java @@ -44,15 +44,15 @@ public class HighAvailabilityMessages private static Locale _currentLocale = BrokerProperties.getLocale(); public static final String HIGHAVAILABILITY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability"; - public static final String STOPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.stopped"; public static final String INTRUDER_DETECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.intruder_detected"; - public static final String STARTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.started"; public static final String TRANSFER_MASTER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.transfer_master"; public static final String QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.quorum_override_changed"; - public static final String DETACHED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.detached"; - public static final String MAJORITY_LOST_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.majority_lost"; + public static final String REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.removed"; + public static final String LEFT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.left"; + public static final String JOINED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.joined"; + public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.created"; + public static final String QUORUM_LOST_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.quorum_lost"; public static final String PRIORITY_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.priority_changed"; - public static final String ATTACHED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.attached"; public static final String ADDED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.added"; public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.deleted"; public static final String ROLE_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.role_changed"; @@ -61,15 +61,15 @@ public class HighAvailabilityMessages static { Logger.getLogger(HIGHAVAILABILITY_LOG_HIERARCHY); - Logger.getLogger(STOPPED_LOG_HIERARCHY); Logger.getLogger(INTRUDER_DETECTED_LOG_HIERARCHY); - Logger.getLogger(STARTED_LOG_HIERARCHY); Logger.getLogger(TRANSFER_MASTER_LOG_HIERARCHY); Logger.getLogger(QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY); - Logger.getLogger(DETACHED_LOG_HIERARCHY); - Logger.getLogger(MAJORITY_LOST_LOG_HIERARCHY); + Logger.getLogger(REMOVED_LOG_HIERARCHY); + Logger.getLogger(LEFT_LOG_HIERARCHY); + Logger.getLogger(JOINED_LOG_HIERARCHY); + Logger.getLogger(CREATED_LOG_HIERARCHY); + Logger.getLogger(QUORUM_LOST_LOG_HIERARCHY); Logger.getLogger(PRIORITY_CHANGED_LOG_HIERARCHY); - Logger.getLogger(ATTACHED_LOG_HIERARCHY); Logger.getLogger(ADDED_LOG_HIERARCHY); Logger.getLogger(DELETED_LOG_HIERARCHY); Logger.getLogger(ROLE_CHANGED_LOG_HIERARCHY); @@ -80,14 +80,14 @@ public class HighAvailabilityMessages /** * Log a HighAvailability message of the Format: - * <pre>HA-1012 : The node ''{0}'' from the replication group ''{1}'' is stopped.</pre> + * <pre>HA-1008 : Intruder detected : Node ''{0}'' ({1})</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage STOPPED(String param1, String param2) + public static LogMessage INTRUDER_DETECTED(String param1, String param2) { - String rawMessage = _messages.getString("STOPPED"); + String rawMessage = _messages.getString("INTRUDER_DETECTED"); final Object[] messageArguments = {param1, param2}; // Create a new MessageFormat to ensure thread safety. @@ -105,23 +105,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return STOPPED_LOG_HIERARCHY; + return INTRUDER_DETECTED_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1007: Intruder node ''{0}'' from ''{1}'' is detected in replication group ''{2}''</pre> + * <pre>HA-1007 : Master transfer requested : to ''{0}'' ({1})</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage INTRUDER_DETECTED(String param1, String param2, String param3) + public static LogMessage TRANSFER_MASTER(String param1, String param2) { - String rawMessage = _messages.getString("INTRUDER_DETECTED"); + String rawMessage = _messages.getString("TRANSFER_MASTER"); - final Object[] messageArguments = {param1, param2, param3}; + final Object[] messageArguments = {param1, param2}; // Create a new MessageFormat to ensure thread safety. // Sharing a MessageFormat and using applyPattern is not thread safe MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); @@ -137,23 +137,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return INTRUDER_DETECTED_LOG_HIERARCHY; + return TRANSFER_MASTER_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1013 : The node ''{0}'' from the replication group ''{1}'' is started.</pre> + * <pre>HA-1011 : Minimum group : {0}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage STARTED(String param1, String param2) + public static LogMessage QUORUM_OVERRIDE_CHANGED(String param1) { - String rawMessage = _messages.getString("STARTED"); + String rawMessage = _messages.getString("QUORUM_OVERRIDE_CHANGED"); - final Object[] messageArguments = {param1, param2}; + final Object[] messageArguments = {param1}; // Create a new MessageFormat to ensure thread safety. // Sharing a MessageFormat and using applyPattern is not thread safe MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); @@ -169,23 +169,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return STARTED_LOG_HIERARCHY; + return QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1014 : Transfer master to ''{0}'' is requested on node ''{1}'' from the replication group ''{2}''.</pre> + * <pre>HA-1004 : Removed : Node : ''{0}'' ({1})</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage TRANSFER_MASTER(String param1, String param2, String param3) + public static LogMessage REMOVED(String param1, String param2) { - String rawMessage = _messages.getString("TRANSFER_MASTER"); + String rawMessage = _messages.getString("REMOVED"); - final Object[] messageArguments = {param1, param2, param3}; + final Object[] messageArguments = {param1, param2}; // Create a new MessageFormat to ensure thread safety. // Sharing a MessageFormat and using applyPattern is not thread safe MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); @@ -201,23 +201,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return TRANSFER_MASTER_LOG_HIERARCHY; + return REMOVED_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1009 : The quorum override attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.</pre> + * <pre>HA-1006 : Left : Node : ''{0}'' ({1})</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage QUORUM_OVERRIDE_CHANGED(String param1, String param2, String param3) + public static LogMessage LEFT(String param1, String param2) { - String rawMessage = _messages.getString("QUORUM_OVERRIDE_CHANGED"); + String rawMessage = _messages.getString("LEFT"); - final Object[] messageArguments = {param1, param2, param3}; + final Object[] messageArguments = {param1, param2}; // Create a new MessageFormat to ensure thread safety. // Sharing a MessageFormat and using applyPattern is not thread safe MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); @@ -233,21 +233,21 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY; + return LEFT_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1003 : The node ''{0}'' detached from the replication group ''{1}''.</pre> + * <pre>HA-1005 : Joined : Node : ''{0}'' ({1})</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage DETACHED(String param1, String param2) + public static LogMessage JOINED(String param1, String param2) { - String rawMessage = _messages.getString("DETACHED"); + String rawMessage = _messages.getString("JOINED"); final Object[] messageArguments = {param1, param2}; // Create a new MessageFormat to ensure thread safety. @@ -265,28 +265,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return DETACHED_LOG_HIERARCHY; + return JOINED_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1006 : A majority of nodes from replication group ''{0}'' is not available for node ''{1}''.</pre> + * <pre>HA-1001 : Created</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage MAJORITY_LOST(String param1, String param2) + public static LogMessage CREATED() { - String rawMessage = _messages.getString("MAJORITY_LOST"); + String rawMessage = _messages.getString("CREATED"); - final Object[] messageArguments = {param1, param2}; - // Create a new MessageFormat to ensure thread safety. - // Sharing a MessageFormat and using applyPattern is not thread safe - MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); - - final String message = formatter.format(messageArguments); + final String message = rawMessage; return new LogMessage() { @@ -297,28 +292,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return MAJORITY_LOST_LOG_HIERARCHY; + return CREATED_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1008 : The priority attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.</pre> + * <pre>HA-1009 : Insufficient replicas contactable</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage PRIORITY_CHANGED(String param1, String param2, String param3) + public static LogMessage QUORUM_LOST() { - String rawMessage = _messages.getString("PRIORITY_CHANGED"); + String rawMessage = _messages.getString("QUORUM_LOST"); - final Object[] messageArguments = {param1, param2, param3}; - // Create a new MessageFormat to ensure thread safety. - // Sharing a MessageFormat and using applyPattern is not thread safe - MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); - - final String message = formatter.format(messageArguments); + final String message = rawMessage; return new LogMessage() { @@ -329,23 +319,23 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return PRIORITY_CHANGED_LOG_HIERARCHY; + return QUORUM_LOST_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1004 : The node ''{0}'' attached to the replication group ''{1}'' with role ''{2}''.</pre> + * <pre>HA-1012 : Priority : {0}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage ATTACHED(String param1, String param2, String param3) + public static LogMessage PRIORITY_CHANGED(String param1) { - String rawMessage = _messages.getString("ATTACHED"); + String rawMessage = _messages.getString("PRIORITY_CHANGED"); - final Object[] messageArguments = {param1, param2, param3}; + final Object[] messageArguments = {param1}; // Create a new MessageFormat to ensure thread safety. // Sharing a MessageFormat and using applyPattern is not thread safe MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); @@ -361,14 +351,14 @@ public class HighAvailabilityMessages public String getLogHierarchy() { - return ATTACHED_LOG_HIERARCHY; + return PRIORITY_CHANGED_LOG_HIERARCHY; } }; } /** * Log a HighAvailability message of the Format: - * <pre>HA-1001 : A new node ''{0}'' is added into a replication group ''{1}''.</pre> + * <pre>HA-1003 : Added : Node : ''{0}'' ({1})</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * @@ -400,21 +390,16 @@ public class HighAvailabilityMessages /** * Log a HighAvailability message of the Format: - * <pre>HA-1002 : An existing node ''{0}'' is removed from the replication group ''{1}''.</pre> + * <pre>HA-1002 : Deleted</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage DELETED(String param1, String param2) + public static LogMessage DELETED() { String rawMessage = _messages.getString("DELETED"); - final Object[] messageArguments = {param1, param2}; - // Create a new MessageFormat to ensure thread safety. - // Sharing a MessageFormat and using applyPattern is not thread safe - MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); - - final String message = formatter.format(messageArguments); + final String message = rawMessage; return new LogMessage() { @@ -432,7 +417,7 @@ public class HighAvailabilityMessages /** * Log a HighAvailability message of the Format: - * <pre>HA-1005 : The role of the node ''{0}'' from the replication group ''{1}'' has changed from ''{2}'' to ''{3}''.</pre> + * <pre>HA-1010 : Role change reported: Node : ''{0}'' ({1}) : from ''{2}'' to ''{3}''</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * @@ -464,16 +449,16 @@ public class HighAvailabilityMessages /** * Log a HighAvailability message of the Format: - * <pre>HA-1010 : The designated primary attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.</pre> + * <pre>HA-1013 : Designated primary : {0}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * */ - public static LogMessage DESIGNATED_PRIMARY_CHANGED(String param1, String param2, String param3) + public static LogMessage DESIGNATED_PRIMARY_CHANGED(String param1) { String rawMessage = _messages.getString("DESIGNATED_PRIMARY_CHANGED"); - final Object[] messageArguments = {param1, param2, param3}; + final Object[] messageArguments = {param1}; // Create a new MessageFormat to ensure thread safety. // Sharing a MessageFormat and using applyPattern is not thread safe MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties index 94df7cc38b..3c5b0d260f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties @@ -18,18 +18,47 @@ # # HA logging message i18n strings. -ADDED=HA-1001 : A new node ''{0}'' is added into a replication group ''{1}''. -DELETED=HA-1002 : An existing node ''{0}'' is removed from the replication group ''{1}''. -DETACHED=HA-1003 : The node ''{0}'' detached from the replication group ''{1}''. -ATTACHED=HA-1004 : The node ''{0}'' attached to the replication group ''{1}'' with role ''{2}''. -ROLE_CHANGED=HA-1005 : The role of the node ''{0}'' from the replication group ''{1}'' has changed from ''{2}'' to ''{3}''. -MAJORITY_LOST=HA-1006 : A majority of nodes from replication group ''{0}'' is not available for node ''{1}''. -INTRUDER_DETECTED=HA-1007: Intruder node ''{0}'' from ''{1}'' is detected in replication group ''{2}'' -PRIORITY_CHANGED=HA-1008 : The priority attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''. -QUORUM_OVERRIDE_CHANGED=HA-1009 : The quorum override attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''. -DESIGNATED_PRIMARY_CHANGED=HA-1010 : The designated primary attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''. -STOPPED=HA-1011 : The node ''{0}'' from the replication group ''{1}'' is stopped. -STARTED=HA-1012 : The node ''{0}'' from the replication group ''{1}'' is started. -TRANSFER_MASTER=HA-1013 : Transfer master to ''{0}'' is requested on node ''{1}'' from the replication group ''{2}''. +CREATED = HA-1001 : Created +DELETED = HA-1002 : Deleted + +# 0 - Node name +# 1 - Node address +ADDED = HA-1003 : Added : Node : ''{0}'' ({1}) + +# 0 - Node name +# 1 - Node address +REMOVED = HA-1004 : Removed : Node : ''{0}'' ({1}) + +# 0 - Node name +# 1 - Node address +JOINED = HA-1005 : Joined : Node : ''{0}'' ({1}) + +# 0 - Node name +# 1 - Node address +LEFT = HA-1006 : Left : Node : ''{0}'' ({1}) + +# 0 - Node name +# 1 - Node address +TRANSFER_MASTER = HA-1007 : Master transfer requested : to ''{0}'' ({1}) + +# 0 - Node name +# 1 - Node address +INTRUDER_DETECTED = HA-1008 : Intruder detected : Node ''{0}'' ({1}) +QUORUM_LOST = HA-1009 : Insufficient replicas contactable + +# 0 - Node name +# 1 - Node address +# 2 - Previous role value +# 3 - New role value +ROLE_CHANGED = HA-1010 : Role change reported: Node : ''{0}'' ({1}) : from ''{2}'' to ''{3}'' + +# 0 - new value +QUORUM_OVERRIDE_CHANGED = HA-1011 : Minimum group : {0} + +# 0 - new value +PRIORITY_CHANGED = HA-1012 : Priority : {0} + +# 0 - new value +DESIGNATED_PRIMARY_CHANGED = HA-1013 : Designated primary : {0} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java index edb78369ae..d59a09fce9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java @@ -116,5 +116,4 @@ public class LogSubjectFormat */ public static final String QUEUE_FORMAT = "vh(/{0})/qu({1})"; - public static final String VIRTUAL_HOST_NODE_FORMAT = "vhn(/{0}))"; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 6c8945582c..f944821c6f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -318,7 +318,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im checkCandidate((Class<? extends ConfiguredObject>) interfaceClass, candidates); } } - if(clazz.getSuperclass() != null & ConfiguredObject.class.isAssignableFrom(clazz.getSuperclass())) + if(clazz.getSuperclass() != null && ConfiguredObject.class.isAssignableFrom(clazz.getSuperclass())) { findBestFitInterface((Class<? extends ConfiguredObject>) clazz.getSuperclass(), candidates); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java index 0f4ecb09dc..b0dda69ee6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java @@ -186,6 +186,7 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(BrokerModel.getInstance()); Reader reader; + try { URL url = new URL(initialConfigurationLocation); @@ -196,9 +197,18 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> reader = new FileReader(initialConfigurationLocation); } - Collection<ConfiguredObjectRecord> records = converter.readFromJson(org.apache.qpid.server.model.Broker.class, - systemConfig, reader); - return records.toArray(new ConfiguredObjectRecord[records.size()]); + try + { + Collection<ConfiguredObjectRecord> records = + converter.readFromJson(org.apache.qpid.server.model.Broker.class, + systemConfig, reader); + return records.toArray(new ConfiguredObjectRecord[records.size()]); + } + finally + { + reader.close(); + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 545a1d941d..c49c2790df 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -2925,7 +2925,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if(existingPolicy != _exclusive) { ExclusivityPolicy newPolicy = _exclusive; - _exclusive = newPolicy; + _exclusive = existingPolicy; updateExclusivityPolicy(newPolicy); } return true; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 57142e6e1f..d0acf1a46f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -1564,7 +1564,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore data = new byte[0]; } } - return ByteBuffer.wrap(data,offsetInMessage,size); + return ByteBuffer.wrap(data,offsetInMessage,Math.min(size,data.length-offsetInMessage)); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index 38101525cd..ad9df793c8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.logging.subjects.VirtualHostNodeLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -54,7 +53,6 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< private final Broker<?> _broker; private final AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED); private final EventLogger _eventLogger; - private final VirtualHostNodeLogSubject _virtualHostNodeLogSubject; private DurableConfigurationStore _durableConfigurationStore; @@ -67,7 +65,6 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< _broker = parent; SystemConfig<?> systemConfig = _broker.getParent(SystemConfig.class); _eventLogger = systemConfig.getEventLogger(); - _virtualHostNodeLogSubject = new VirtualHostNodeLogSubject(getName()); } @@ -248,8 +245,4 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< protected abstract void activate(); - public VirtualHostNodeLogSubject getVirtualHostNodeLogSubject() - { - return _virtualHostNodeLogSubject; - } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7877812d84..49bc26149e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -474,13 +474,12 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } else { - getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(), - _currentMessage.getMessagePublishInfo().getRoutingKey() - == null - ? null - : _currentMessage.getMessagePublishInfo() - .getRoutingKey() - .toString())); + AMQShortString exchangeName = _currentMessage.getExchangeName(); + AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); + + getVirtualHost().getEventLogger().message( + ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), + routingKey == null ? null : routingKey.asString())); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index 1bd9ab079e..c33af48d8e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -158,7 +158,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap acknowledged.add(instance); } } - return ackedMessageMap.values(); + return acknowledged; } private void collect(long key, Map<Long, MessageInstance> msgs) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java new file mode 100644 index 0000000000..ca52173e66 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collection; + +import junit.framework.TestCase; + +import org.apache.qpid.server.message.MessageInstance; + +public class UnacknowledgedMessageMapTest extends TestCase +{ + public void testDeletedMessagesCantBeAcknowledged() + { + UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100); + final int expectedSize = 5; + MessageInstance[] msgs = populateMap(map,expectedSize); + assertEquals(expectedSize,map.size()); + Collection<MessageInstance> acknowledged = map.acknowledge(100, true); + assertEquals(expectedSize, acknowledged.size()); + assertEquals(0,map.size()); + for(int i = 0; i < expectedSize; i++) + { + assertTrue("Message " + i + " is missing", acknowledged.contains(msgs[i])); + } + + map = new UnacknowledgedMessageMapImpl(100); + msgs = populateMap(map,expectedSize); + // simulate some messages being ttl expired + when(msgs[2].lockAcquisition()).thenReturn(Boolean.FALSE); + when(msgs[4].lockAcquisition()).thenReturn(Boolean.FALSE); + + assertEquals(expectedSize,map.size()); + + + acknowledged = map.acknowledge(100, true); + assertEquals(expectedSize-2, acknowledged.size()); + assertEquals(0,map.size()); + for(int i = 0; i < expectedSize; i++) + { + assertEquals(i != 2 && i != 4, acknowledged.contains(msgs[i])); + } + + } + + public MessageInstance[] populateMap(final UnacknowledgedMessageMap map, int size) + { + MessageInstance[] msgs = new MessageInstance[size]; + for(int i = 0; i < size; i++) + { + msgs[i] = createMessageInstance(i); + map.add((long)i,msgs[i]); + } + return msgs; + } + + private MessageInstance createMessageInstance(final int id) + { + MessageInstance instance = mock(MessageInstance.class); + when(instance.lockAcquisition()).thenReturn(Boolean.TRUE); + return instance; + } +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index 3974ab0af6..266f3b6868 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -20,10 +20,32 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.UnsignedShort; import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; @@ -32,17 +54,6 @@ import org.apache.qpid.transport.codec.BBEncoder; import org.apache.qpid.typedmessage.TypedBytesContentWriter; import org.apache.qpid.typedmessage.TypedBytesFormatException; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; - public class MessageConverter_from_1_0 { private static final Charset UTF_8 = Charset.forName("UTF-8"); @@ -91,7 +102,7 @@ public class MessageConverter_from_1_0 Section firstBodySection = sections.get(0); if(firstBodySection instanceof AmqpValue) { - bodyObject = fixObject(((AmqpValue)firstBodySection).getValue()); + bodyObject = convertValue(((AmqpValue)firstBodySection).getValue()); } else if(firstBodySection instanceof Data) { @@ -115,7 +126,7 @@ public class MessageConverter_from_1_0 { totalSequence.addAll(((AmqpSequence)section).getValue()); } - bodyObject = fixObject(totalSequence); + bodyObject = convertValue(totalSequence); } } @@ -127,40 +138,94 @@ public class MessageConverter_from_1_0 return bodyObject; } - private static Object fixObject(final Object value) + private static final Set<Class> STANDARD_TYPES = new HashSet<>(Arrays.<Class>asList(Boolean.class, + Byte.class, + Short.class, + Integer.class, + Long.class, + Float.class, + Double.class, + Character.class, + String.class, + byte[].class, + UUID.class)); + + private static Map convertMap(final Map map) { - if(value instanceof Binary) + Map resultMap = new LinkedHashMap(); + Iterator<Map.Entry> iterator = map.entrySet().iterator(); + while(iterator.hasNext()) { - final Binary binaryValue = (Binary) value; - byte[] data = new byte[binaryValue.getLength()]; - binaryValue.asByteBuffer().get(data); - return data; + Map.Entry entry = iterator.next(); + resultMap.put(convertValue(entry.getKey()), convertValue(entry.getValue())); + } - else if(value instanceof List) + return resultMap; + } + + public static Object convertValue(final Object value) + { + if(value != null && !STANDARD_TYPES.contains(value)) { - List listValue = (List) value; - List fixedValue = new ArrayList(listValue.size()); - for(Object o : listValue) + if(value instanceof Map) { - fixedValue.add(fixObject(o)); + return convertMap((Map)value); } - return fixedValue; - } - else if(value instanceof Map) - { - Map<?,?> mapValue = (Map) value; - Map fixedValue = new LinkedHashMap(mapValue.size()); - for(Map.Entry<?,?> entry : mapValue.entrySet()) + else if(value instanceof List) + { + return convertList((List)value); + } + else if(value instanceof UnsignedByte) + { + return ((UnsignedByte)value).shortValue(); + } + else if(value instanceof UnsignedShort) + { + return ((UnsignedShort)value).intValue(); + } + else if(value instanceof UnsignedInteger) + { + return ((UnsignedInteger)value).longValue(); + } + else if(value instanceof UnsignedLong) + { + return ((UnsignedLong)value).longValue(); + } + else if(value instanceof Symbol) + { + return value.toString(); + } + else if(value instanceof Date) { - fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue())); + return ((Date)value).getTime(); + } + else if(value instanceof Binary) + { + Binary binary = (Binary)value; + byte[] data = new byte[binary.getLength()]; + binary.asByteBuffer().get(data); + return data; + } + else + { + // Throw exception instead? + return value.toString(); } - return fixedValue; } else { return value; } + } + private static List convertList(final List list) + { + List result = new ArrayList(list.size()); + for(Object entry : list) + { + result.add(convertValue(entry)); + } + return result; } public static byte[] convertToBody(Object object) diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 8d77a8cfaf..54d4638bb8 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.converter.v0_10_v1_0; import java.nio.ByteBuffer; +import java.util.Map; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageConverter; @@ -176,7 +177,8 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 messageProps.setReplyTo(replyTo); } - messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeadersAsMap()); + messageProps.setApplicationHeaders((Map<String, Object>) MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader() + .getHeadersAsMap())); Header header = new Header(deliveryProps, messageProps, null); return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index 5b1c25e879..2de21e1a9f 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -194,7 +194,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ for(String headerName : serverMsg.getMessageHeader().getHeaderNames()) { - headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName)); + headerProps.put(headerName, MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader().getHeader(headerName))); } props.setHeaders(FieldTable.convertToFieldTable(headerProps)); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java index d8f8e4e4b0..2f6f55e042 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java @@ -61,23 +61,24 @@ public class DefinedFileServlet extends HttpServlet { try (OutputStream output = HttpManagementUtil.getOutputStream(request, response)) { - InputStream fileInput = getClass().getResourceAsStream("/resources/" + _filename); - - if (fileInput != null) + try(InputStream fileInput = getClass().getResourceAsStream("/resources/" + _filename)) { - byte[] buffer = new byte[1024]; - response.setStatus(HttpServletResponse.SC_OK); - int read = 0; + if (fileInput != null) + { + byte[] buffer = new byte[1024]; + response.setStatus(HttpServletResponse.SC_OK); + int read = 0; - while ((read = fileInput.read(buffer)) > 0) + while ((read = fileInput.read(buffer)) > 0) + { + output.write(buffer, 0, read); + } + } + else { - output.write(buffer, 0, read); + response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file: " + _filename); } } - else - { - response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file: " + _filename); - } } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 945645ccb1..35252204ac 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2101,7 +2101,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void deregisterProducer(long producerId) { - _producers.remove(new Long(producerId)); + _producers.remove(producerId); } boolean isInRecovery() @@ -2935,7 +2935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void registerProducer(long producerId, MessageProducer producer) { - _producers.put(new Long(producerId), producer); + _producers.put(producerId, producer); } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3ff7416d8f..dbbc300910 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -26,6 +26,14 @@ import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTRO import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,13 +60,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; -import javax.jms.Destination; -import javax.jms.JMSException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; - public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { /** Used for debugging. */ @@ -736,10 +737,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe boolean isConsumer, boolean noLocal) throws AMQException { - throw new UnsupportedOperationException("The new addressing based syntax is " - + "not supported for AMQP 0-8/0-9 versions"); + throw new UnsupportedAddressSyntaxException(dest); } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 3cb723e5a8..b9bb03444f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.client; +import java.nio.ByteBuffer; +import java.util.UUID; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -33,16 +44,6 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.Topic; - -import java.nio.ByteBuffer; -import java.util.UUID; public class BasicMessageProducer_0_8 extends BasicMessageProducer { @@ -57,6 +58,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer void declareDestination(AMQDestination destination) { + + if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + throw new UnsupportedAddressSyntaxException(destination); + } + if(getSession().isDeclareExchanges()) { final MethodRegistry methodRegistry = getSession().getMethodRegistry(); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/qpid/java/client/src/main/java/org/apache/qpid/client/UnsupportedAddressSyntaxException.java index 1291380311..c65fd7c189 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/UnsupportedAddressSyntaxException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,35 +18,15 @@ * under the License. * */ -package org.apache.qpid.server.util; - -import java.util.concurrent.Callable; +package org.apache.qpid.client; -public abstract class TimedRun implements Callable<Long> +class UnsupportedAddressSyntaxException extends UnsupportedOperationException { - private final String description; - - public TimedRun(String description) + UnsupportedAddressSyntaxException(final AMQDestination dest) { - this.description = description; + super("The address '" + dest.toString() + "' uses the " + AMQDestination.DestSyntax.ADDR + " addressing syntax" + + " which is not supported for AMQP 0-8/0-9/0-9-1 connections. Use the " + AMQDestination.DestSyntax.BURL + + " syntax instead:\n" + + "\tBURL:<Exchange Class>://<Exchange Name>/[<Destination>]/[<Queue>][?<option>='<value>'[&<option>='<value>']]\n"); } - - public Long call() throws Exception - { - setup(); - long start = System.currentTimeMillis(); - run(); - long stop = System.currentTimeMillis(); - teardown(); - return stop - start; - } - - public String toString() - { - return description; - } - - protected void setup() throws Exception{} - protected void teardown() throws Exception{} - protected abstract void run() throws Exception; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index b039d8b005..808b2781c4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.jms; -import org.apache.qpid.transport.ConnectionSettings; - import java.util.Map; +import org.apache.qpid.transport.ConnectionSettings; + public interface BrokerDetails { @@ -104,9 +104,5 @@ public interface BrokerDetails boolean getBooleanProperty(String propName); - String toString(); - - boolean equals(Object o); - ConnectionSettings buildConnectionSettings(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java index 978ad3af7d..9d8509b303 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java @@ -50,7 +50,7 @@ public class ConstantExpression implements Expression public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); - private Object value; + private Object _value; public static ConstantExpression createFromDecimal(String text) { @@ -64,7 +64,7 @@ public class ConstantExpression implements Expression Number value; try { - value = new Long(text); + value = Long.valueOf(text); } catch (NumberFormatException e) { @@ -114,17 +114,17 @@ public class ConstantExpression implements Expression public ConstantExpression(Object value) { - this.value = value; + this._value = value; } public Object evaluate(FilterableMessage message) { - return value; + return _value; } public Object getValue() { - return value; + return _value; } /** @@ -132,22 +132,22 @@ public class ConstantExpression implements Expression */ public String toString() { - if (value == null) + if (_value == null) { return "NULL"; } - if (value instanceof Boolean) + if (_value instanceof Boolean) { - return ((Boolean) value) ? "TRUE" : "FALSE"; + return ((Boolean) _value) ? "TRUE" : "FALSE"; } - if (value instanceof String) + if (_value instanceof String) { - return encodeString((String) value); + return encodeString((String) _value); } - return value.toString(); + return _value.toString(); } /** @@ -186,7 +186,7 @@ public class ConstantExpression implements Expression */ public static String encodeString(String s) { - StringBuffer b = new StringBuffer(); + StringBuilder b = new StringBuilder(); b.append('\''); for (int i = 0; i < s.length(); i++) { diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java index 01af41c2c9..bee89fcc66 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.management.common.sasl; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; @@ -28,8 +31,6 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import java.io.IOException; -import java.io.UnsupportedEncodingException; public class PlainSaslClient implements SaslClient { @@ -170,9 +171,10 @@ public class PlainSaslClient implements SaslClient clearPassword(); } - protected void finalize() + protected void finalize() throws Throwable { clearPassword(); + super.finalize(); } private Object[] getUserInfo() throws SaslException diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java index caee5d481a..3778e39f62 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java @@ -19,12 +19,13 @@ */ package org.apache.qpid.management.common.sasl; +import java.io.IOException; + import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; -import java.io.IOException; public class UserPasswordCallbackHandler implements CallbackHandler { @@ -70,8 +71,9 @@ public class UserPasswordCallbackHandler implements CallbackHandler } } - protected void finalize() + protected void finalize() throws Throwable { clearPassword(); + super.finalize(); } } diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java index 314ef70144..5f12fa9dfc 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java @@ -20,15 +20,16 @@ */ package org.apache.qpid.management.common.sasl; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; public class UsernameHashedPasswordCallbackHandler implements CallbackHandler @@ -76,9 +77,10 @@ public class UsernameHashedPasswordCallbackHandler implements CallbackHandler } } - protected void finalize() + protected void finalize() throws Throwable { clearPassword(); + super.finalize(); } public static char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java index e0ae137c35..1cb938e915 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java @@ -102,27 +102,27 @@ public abstract class NumericGeneratedPropertySupport extends GeneratedPropertyS Number result = null; if (targetType == double.class) { - result = new Double(value); + result = value; } else if (targetType == float.class) { - result = new Float(value); + result = (float) value; } else if (targetType == int.class) { - result = new Integer((int) value); + result = (int) value; } else if (targetType == long.class) { - result = new Long((long) value); + result = (long) value; } else if (targetType == short.class) { - result = new Short((short) value); + result = (short) value; } else if (targetType == byte.class) { - result = new Byte((byte) value); + result = (byte) value; } else { diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml index 1be2ce0f95..b30472ce88 100644 --- a/qpid/java/pom.xml +++ b/qpid/java/pom.xml @@ -54,7 +54,6 @@ <qpid.home.qbtc.output>${qpid.home}${file.separator}target${file.separator}qbtc-output</qpid.home.qbtc.output> <!-- override for broker tests --> <qpid.work>${project.build.directory}${file.separator}QPID_WORK</qpid.work> - <argLine /> <profile>java-mms.0-10</profile> <profile.broker.language>java</profile.broker.language> <profile.broker.type>internal</profile.broker.type> diff --git a/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 5a3c0182fd..845e7e58c3 100644 --- a/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -90,16 +90,18 @@ public class QpidTestCase extends TestCase if (file.exists()) { _logger.info("Using exclude file: " + uri); - try + try(FileReader fileReader = new FileReader(file)) { - BufferedReader in = new BufferedReader(new FileReader(file)); - String excludedTest = in.readLine(); - do + try(BufferedReader in = new BufferedReader(fileReader)) { - exclusionList.add(excludedTest); - excludedTest = in.readLine(); + String excludedTest = in.readLine(); + do + { + exclusionList.add(excludedTest); + excludedTest = in.readLine(); + } + while (excludedTest != null); } - while (excludedTest != null); } catch (IOException e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java index fce47f9986..70e1b27fba 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.net.MalformedURLException; @@ -114,7 +115,10 @@ public class TestBrokerConfiguration try { URL url = new URL(initialStoreLocation); - reader = new InputStreamReader(url.openStream()); + try(InputStream urlStream = url.openStream()) + { + reader = new InputStreamReader(urlStream); + } } catch (MalformedURLException e) { @@ -122,6 +126,8 @@ public class TestBrokerConfiguration } Collection<ConfiguredObjectRecord> records = converter.readFromJson(org.apache.qpid.server.model.Broker.class, parentObject, reader); + reader.close(); + _store = new AbstractMemoryStore(Broker.class){}; ConfiguredObjectRecord[] initialRecords = records.toArray(new ConfiguredObjectRecord[records.size()]); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java deleted file mode 100644 index 941c1d9499..0000000000 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -import java.util.Collection; -import java.util.concurrent.Callable; - -import org.apache.log4j.Logger; - -public class AveragedRun implements Callable<RunStats> -{ - private static final Logger _logger = Logger.getLogger(AveragedRun.class); - - private final RunStats stats = new RunStats(); - private final TimedRun test; - private final int iterations; - - public AveragedRun(TimedRun test, int iterations) - { - this.test = test; - this.iterations = iterations; - } - - public RunStats call() throws Exception - { - for (int i = 0; i < iterations; i++) - { - stats.record(test.call()); - } - return stats; - } - - public void run() throws Exception - { - _logger.info(test + ": " + call()); - } - - public String toString() - { - return test.toString(); - } - - static void run(Collection<AveragedRun> tests) throws Exception - { - for(AveragedRun test : tests) - { - test.run(); - } - } -} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java deleted file mode 100644 index 3a9354d822..0000000000 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.utils; - -import org.apache.log4j.Logger; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation - * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant - * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. - * - * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a - * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation - * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order - * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded - * conversation (the conversation methods can be called many times in parallel): - * - * <p/><pre> - * class Initiator - * { - * ConversationHelper conversation = new ConversationHelper(connection, null, - * java.util.concurrent.LinkedBlockingQueue.class); - * - * initiateConversation() - * { - * try { - * // Exchange greetings. - * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello.")); - * Message greeting = conversation.receive(); - * - * // Exchange goodbyes. - * conversation.send(conversation.getSession().createTextMessage("Goodbye.")); - * Message goodbye = conversation.receive(); - * } finally { - * conversation.end(); - * } - * } - * } - * - * class Responder - * { - * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination, - * java.util.concurrent.LinkedBlockingQueue.class); - * - * respondToConversation() - * { - * try { - * // Exchange greetings. - * Message greeting = conversation.receive(); - * conversation.send(conversation.getSession().createTextMessage("Hello.")); - * - * // Exchange goodbyes. - * Message goodbye = conversation.receive(); - * conversation.send(conversation.getSession().createTextMessage("Goodbye.")); - * } finally { - * conversation.end(); - * } - * } - * } - * </pre> - * - * <p/>Conversation correlation id's are generated on a per thread basis. - * - * <p/>The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS - * sessions are not multi-threaded. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Associate messages to an ongoing conversation using correlation ids. - * <tr><td> Auto manage sessions for conversations. - * <tr><td> Store messages not in a conversation in dead letter box. - * </table> - */ -public class ConversationFactory -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(ConversationFactory.class); - - /** Holds a map from correlation id's to queues. */ - private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>(); - - /** Holds the connection over which the conversation is conducted. */ - private Connection connection; - - /** Holds the controlSession over which the conversation is conduxted. */ - private Session session; - - /** The message consumer for incoming messages. */ - private MessageConsumer consumer; - - /** The message producer for outgoing messages. */ - private MessageProducer producer; - - /** The well-known or temporary destination to receive replies on. */ - private Destination receiveDestination; - - /** Holds the queue implementation class for the reply queue. */ - private Class<? extends BlockingQueue> queueClass; - - /** Used to hold any replies that are received outside of the context of a conversation. */ - private BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>(); - - /* Used to hold conversation state on a per thread basis. */ - /* - ThreadLocal<Conversation> threadLocals = - new ThreadLocal<Conversation>() - { - protected Conversation initialValue() - { - Conversation settings = new Conversation(); - settings.conversationId = conversationIdGenerator.getAndIncrement(); - - return settings; - } - }; - */ - - /** Generates new coversation id's as needed. */ - private AtomicLong conversationIdGenerator = new AtomicLong(); - - /** - * Creates a conversation helper on the specified connection with the default sending destination, and listening - * to the specified receiving destination. - * - * @param connection The connection to build the conversation helper on. - * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary - * queue. - * @param queueClass The queue implementation class. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public ConversationFactory(Connection connection, Destination receiveDestination, - Class<? extends BlockingQueue> queueClass) throws JMSException - { - log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination - + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called"); - - this.connection = connection; - this.queueClass = queueClass; - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Check if a well-known receive destination has been provided, or use a temporary queue if not. - this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue(); - - consumer = session.createConsumer(receiveDestination); - producer = session.createProducer(null); - - consumer.setMessageListener(new Receiver()); - } - - /** - * Creates a new conversation context. - * - * @return A new conversation context. - */ - public Conversation startConversation() - { - log.debug("public Conversation startConversation(): called"); - - Conversation conversation = new Conversation(); - conversation.conversationId = conversationIdGenerator.getAndIncrement(); - - return conversation; - } - - /** - * Ensures that the reply queue for a conversation exists. - * - * @param conversationId The conversation correlation id. - */ - private void initQueueForId(long conversationId) - { - if (!idsToQueues.containsKey(conversationId)) - { - idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass)); - } - } - - /** - * Clears the dead letter box, returning all messages that were in it. - * - * @return All messages in the dead letter box. - */ - public Collection<Message> emptyDeadLetterBox() - { - log.debug("public Collection<Message> emptyDeadLetterBox(): called"); - - Collection<Message> result = new ArrayList<Message>(); - deadLetterBox.drainTo(result); - - return result; - } - - /** - * Gets the controlSession over which the conversation is conducted. - * - * @return The controlSession over which the conversation is conducted. - */ - public Session getSession() - { - // Conversation settings = threadLocals.get(); - - return session; - } - - /** - * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply - * destination automatically updated to the last received reply-to destination. - */ - public class Conversation - { - /** Holds the correlation id for the context. */ - private long conversationId; - - /** - * Holds the send destination for the context. This will automatically be updated to the most recently received - * reply-to destination. - */ - private Destination sendDestination; - - /** - * Sends a message to the default sending location. The correlation id of the message will be assigned by this - * method, overriding any previously set value. - * - * @param sendDestination The destination to send to. This may be null to use the last received reply-to - * destination. - * @param message The message to send. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no - * send destination is specified and there is no most recent reply-to destination available - * to use. - */ - public void send(Destination sendDestination, Message message) throws JMSException - { - log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " - + message.getJMSMessageID() + "): called"); - - // Conversation settings = threadLocals.get(); - // long conversationId = conversationId; - message.setJMSCorrelationID(Long.toString(conversationId)); - message.setJMSReplyTo(receiveDestination); - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - // Check if an overriding send to destination has been set or use the last reply-to if not. - Destination sendTo = null; - - if (sendDestination != null) - { - sendTo = sendDestination; - } - else - { - throw new JMSException("The send destination was specified, and no most recent reply-to available to use."); - } - - // Send the message. - synchronized (this) - { - producer.send(sendTo, message); - } - } - - /** - * Gets the next message in an ongoing conversation. This method may block until such a message is received. - * - * @return The next incoming message in the conversation. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message - * did not have its reply-to destination set up. - */ - public Message receive() throws JMSException - { - log.debug("public Message receive(): called"); - - // Conversation settings = threadLocals.get(); - // long conversationId = settings.conversationId; - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - BlockingQueue<Message> queue = idsToQueues.get(conversationId); - - try - { - Message result = queue.take(); - - // Keep the reply-to destination to send replies to. - sendDestination = result.getJMSReplyTo(); - - return result; - } - catch (InterruptedException e) - { - return null; - } - } - - /** - * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are - * received they will be returned. If a timeout is specified, then all messages up to the limit, received within - * that timespan will be returned. At least one of the message count or timeout should be set to a value of - * 1 or greater. - * - * @param num The number of messages to receive, or all if this is less than 1. - * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. - * - * @return All messages received within the count limit and the timeout. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. - */ - public Collection<Message> receiveAll(int num, long timeout) throws JMSException - { - log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout - + "): called"); - - // Check that a timeout or message count was set. - if ((num < 1) && (timeout < 1)) - { - throw new IllegalArgumentException("At least one of message count (num) or timeout must be set."); - } - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - BlockingQueue<Message> queue = idsToQueues.get(conversationId); - - // Used to collect the received messages in. - Collection<Message> result = new ArrayList<Message>(); - - // Used to indicate when the timeout or message count has expired. - boolean receiveMore = true; - - int messageCount = 0; - - // Receive messages until the timeout or message count expires. - do - { - try - { - Message next = null; - - // Try to receive the message with a timeout if one has been set. - if (timeout > 0) - { - next = queue.poll(timeout, TimeUnit.MILLISECONDS); - - // Check if the timeout expired, and stop receiving if so. - if (next == null) - { - receiveMore = false; - } - } - // Receive the message without a timeout. - else - { - next = queue.take(); - } - - // Increment the message count if a message was received. - messageCount += (next != null) ? 1 : 0; - - // Check if all the requested messages were received, and stop receiving if so. - if ((num > 0) && (messageCount >= num)) - { - receiveMore = false; - } - - // Keep the reply-to destination to send replies to. - sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination; - - if (next != null) - { - result.add(next); - } - } - catch (InterruptedException e) - { - // Restore the threads interrupted status. - Thread.currentThread().interrupt(); - - // Stop receiving but return the messages received so far. - receiveMore = false; - } - } - while (receiveMore); - - return result; - } - - /** - * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any - * incoming messages using them will go to the dead letter box. - */ - public void end() - { - log.debug("public void end(): called"); - - // Ensure that the thread local for the current thread is cleaned up. - // Conversation settings = threadLocals.get(); - // long conversationId = settings.conversationId; - // threadLocals.remove(); - - // Ensure that its queue is removed from the queue map. - BlockingQueue<Message> queue = idsToQueues.remove(conversationId); - - // Move any outstanding messages on the threads conversation id into the dead letter box. - queue.drainTo(deadLetterBox); - } - } - - /** - * Implements the message listener for this conversation handler. - */ - protected class Receiver implements MessageListener - { - /** - * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id - * and placed into queues. - * - * @param message The incoming message. - */ - public void onMessage(Message message) - { - log.debug("public void onMessage(Message message = " + message + "): called"); - - try - { - Long conversationId = Long.parseLong(message.getJMSCorrelationID()); - - // Find the converstaion queue to place the message on. If there is no conversation for the message id, - // the the dead letter box queue is used. - BlockingQueue<Message> queue = idsToQueues.get(conversationId); - queue = (queue == null) ? deadLetterBox : queue; - - queue.put(message); - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } -} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java b/qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java deleted file mode 100644 index 151d1473ac..0000000000 --- a/qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import org.apache.log4j.Logger; - -import java.io.File; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names - * that match a regular expression. - * - * <p/>In order to test whether a class implements an interface or extends a class, the class must be loaded (unless - * the class files were to be scanned directly). Using this collector can cause problems when it scans the classpath, - * because loading classes will initialize their statics, which in turn may cause undesired side effects. For this - * reason, the collector should always be used with a regular expression, through which the class file names are - * filtered, and only those that pass this filter will be tested. For example, if you define tests in classes that - * end with the keyword "Test" then use the regular expression "Test$" to match this. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Find all classes matching type and name pattern on the classpath. - * </table> - * - * @todo Add logic to scan jars as well as directories. - */ -public class ClasspathScanner -{ - private static final Logger log = Logger.getLogger(ClasspathScanner.class); - - /** - * Scans the classpath and returns all classes that extend a specified class and match a specified name. - * There is an flag that can be used to indicate that only Java Beans will be matched (that is, only those classes - * that have a default constructor). - * - * @param matchingClass The class or interface to match. - * @param matchingRegexp The regular expression to match against the class name. - * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched. - * - * @return All the classes that match this collector. - */ - public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp, - boolean beanOnly) - { - log.debug("public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass = " + matchingClass - + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called"); - - // Build a compiled regular expression from the pattern to match. - Pattern matchPattern = Pattern.compile(matchingRegexp); - - String classPath = System.getProperty("java.class.path"); - Map<String, Class<? extends T>> result = new HashMap<String, Class<? extends T>>(); - - log.debug("classPath = " + classPath); - - // Find matching classes starting from all roots in the classpath. - for (String path : splitClassPath(classPath)) - { - gatherFiles(new File(path), "", result, matchPattern, matchingClass); - } - - return result.values(); - } - - /** - * Finds all matching classes rooted at a given location in the file system. If location is a directory it - * is recursively examined. - * - * @param classRoot The root of the current point in the file system being examined. - * @param classFileName The name of the current file or directory to examine. - * @param result The accumulated mapping from class names to classes that match the scan. - * - * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with - * iteration. - */ - private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result, - Pattern matchPattern, Class<? extends T> matchClass) - { - log.debug("private static <T> void gatherFiles(File classRoot = " + classRoot + ", String classFileName = " - + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern - + ", Class<? extends T> matchClass = " + matchClass + "): called"); - - File thisRoot = new File(classRoot, classFileName); - - // If the current location is a file, check if it is a matching class. - if (thisRoot.isFile()) - { - // Check that the file has a matching name. - if (matchesName(thisRoot.getName(), matchPattern)) - { - String className = classNameFromFile(thisRoot.getName()); - - // Check that the class has matching type. - try - { - Class<?> candidateClass = Class.forName(className); - - Class matchedClass = matchesClass(candidateClass, matchClass); - - if (matchedClass != null) - { - result.put(className, matchedClass); - } - } - catch (ClassNotFoundException e) - { - // Ignore this. The matching class could not be loaded. - log.debug("Got ClassNotFoundException, ignoring.", e); - } - } - - return; - } - // Otherwise the current location is a directory, so examine all of its contents. - else - { - String[] contents = thisRoot.list(); - - if (contents != null) - { - for (String content : contents) - { - gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass); - } - } - } - } - - /** - * Checks if the specified class file name corresponds to a class with name matching the specified regular expression. - * - * @param classFileName The class file name. - * @param matchPattern The regular expression pattern to match. - * - * @return <tt>true</tt> if the class name matches, <tt>false</tt> otherwise. - */ - private static boolean matchesName(String classFileName, Pattern matchPattern) - { - String className = classNameFromFile(classFileName); - Matcher matcher = matchPattern.matcher(className); - - return matcher.matches(); - } - - /** - * Checks if the specified class to compare extends the base class being scanned for. - * - * @param matchingClass The base class to match against. - * @param toMatch The class to match against the base class. - * - * @return The class to check, cast as an instance of the class to match if the class extends the base class, or - * <tt>null</tt> otherwise. - */ - private static <T> Class<? extends T> matchesClass(Class<?> matchingClass, Class<? extends T> toMatch) - { - try - { - return matchingClass.asSubclass(toMatch); - } - catch (ClassCastException e) - { - return null; - } - } - - /** - * Takes a classpath (which is a series of paths) and splits it into its component paths. - * - * @param classPath The classpath to split. - * - * @return A list of the component paths that make up the class path. - */ - private static List<String> splitClassPath(String classPath) - { - List<String> result = new LinkedList<String>(); - String separator = System.getProperty("path.separator"); - StringTokenizer tokenizer = new StringTokenizer(classPath, separator); - - while (tokenizer.hasMoreTokens()) - { - result.add(tokenizer.nextToken()); - } - - return result; - } - - /** - * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash - * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending. - * - * @param classFileName The filename of the class to translate to a class name. - * - * @return The fully qualified class name. - */ - private static String classNameFromFile(String classFileName) - { - log.debug("private static String classNameFromFile(String classFileName = " + classFileName + "): called"); - - // Remove the .class ending. - String s = classFileName.substring(0, classFileName.length() - ".class".length()); - - // Turn / seperators in . seperators. - String s2 = s.replace(File.separatorChar, '.'); - - // Knock off any leading . caused by a leading /. - if (s2.startsWith(".")) - { - return s2.substring(1); - } - - return s2; - } -} |