summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:15:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:15:31 +0000
commite0e8f4c5087c1c5dc787740d6bd862755bd8daf1 (patch)
treeda456e90c9b57ba69f255e564c2945aaf2c7617b
parent78a00e2a3a1bbc7486de0fad72603617958062c3 (diff)
downloadqpid-python-e0e8f4c5087c1c5dc787740d6bd862755bd8daf1.tar.gz
Merging from trunk r1617822:1618206 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620339 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java5
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java2
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java9
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/BDBHAVirtualHostNodeLogSubject.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/VirtualHostNodeLogSubject.java)11
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/GroupLogSubject.java (renamed from qpid/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java)41
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java49
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java152
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java6
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java140
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java6
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java14
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java82
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java137
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties55
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java84
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java131
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java4
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/UnsupportedAddressSyntaxException.java (renamed from qpid/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java)38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java24
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java8
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java6
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java12
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java12
-rw-r--r--qpid/java/pom.xml1
-rw-r--r--qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java16
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java8
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java68
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java484
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java239
45 files changed, 639 insertions, 1343 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index 1a72e129e7..508aaf7518 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -173,7 +173,10 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
}
else
{
- throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ JMSException jmsException =
+ new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ jmsException.initCause(e);
+ throw jmsException;
}
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
index 302060776a..82f29ea4b1 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
@@ -34,7 +34,7 @@ public class ConnectionErrorException extends ConnectionException
public ConnectionErrorException(Error remoteError)
{
- super(remoteError.getDescription());
+ super(remoteError.getDescription() == null ? remoteError.toString() : remoteError.getDescription());
_remoteError = remoteError;
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index a2a15779d2..826a757850 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -194,9 +194,14 @@ public class Receiver implements DeliveryStateHandler
}
catch (InterruptedException e)
{
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted whil waiting for detach following failed attach");
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for detach following failed attach");
}
- throw new ConnectionErrorException(getError());
+ throw new ConnectionErrorException(getError().getCondition(),
+ getError().getDescription() == null
+ ? "AMQP error: '" + getError().getCondition().toString()
+ + "' when attempting to create a receiver"
+ + (source != null ? " from: '" + source.getAddress() +"'" : "")
+ : getError().getDescription());
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/VirtualHostNodeLogSubject.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/BDBHAVirtualHostNodeLogSubject.java
index fad9a91841..a209062993 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/VirtualHostNodeLogSubject.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/BDBHAVirtualHostNodeLogSubject.java
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.server.logging.subjects;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.VIRTUAL_HOST_NODE_FORMAT;
-import org.apache.qpid.server.model.VirtualHostNode;
-
-public class VirtualHostNodeLogSubject extends AbstractLogSubject
+public class BDBHAVirtualHostNodeLogSubject extends AbstractLogSubject
{
- public VirtualHostNodeLogSubject(String nodeName)
+ public static final String VIRTUAL_HOST_NODE_FORMAT = "grp(/{0})/vhn(/{1})";
+
+ public BDBHAVirtualHostNodeLogSubject(String groupName, String nodeName)
{
- setLogStringWithFormat(VIRTUAL_HOST_NODE_FORMAT, nodeName);
+ setLogStringWithFormat(VIRTUAL_HOST_NODE_FORMAT, groupName, nodeName);
}
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/GroupLogSubject.java
index ec67fc68b3..51fd1fc2dc 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/logging/subjects/GroupLogSubject.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,40 +18,15 @@
* under the License.
*
*/
-package org.apache.qpid.server.util;
-
-public class RunStats
-{
- private long min = Long.MAX_VALUE;
- private long max;
- private long total;
- private int count;
+package org.apache.qpid.server.logging.subjects;
- public void record(long time)
- {
- max = Math.max(time, max);
- min = Math.min(time, min);
- total += time;
- count++;
- }
- public long getMin()
- {
- return min;
- }
-
- public long getMax()
- {
- return max;
- }
-
- public long getAverage()
- {
- return total / count;
- }
+public class GroupLogSubject extends AbstractLogSubject
+{
+ public static final String GROUP_FORMAT = "grp(/{0})";
- public String toString()
+ public GroupLogSubject(String groupName)
{
- return "avg=" + getAverage() + ", min=" + min + ", max=" + max;
+ setLogStringWithFormat(GROUP_FORMAT, groupName);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 835846a5ec..78cddc708e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1333,7 +1333,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
data = new byte[0];
}
}
- return ByteBuffer.wrap(data,offsetInMessage,size);
+ return ByteBuffer.wrap(data,offsetInMessage,Math.min(size,data.length-offsetInMessage));
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index 5263f5942f..06671998ec 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -30,9 +30,14 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.rep.MasterStateException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
+import org.apache.qpid.server.logging.subjects.BDBHAVirtualHostNodeLogSubject;
+import org.apache.qpid.server.logging.subjects.GroupLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -40,6 +45,8 @@ import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
@@ -53,13 +60,16 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
private volatile long _joinTime;
private volatile long _lastTransactionId;
+ private volatile String _lastReplicatedEnvironmentState = ReplicatedEnvironment.State.UNKNOWN.name();
@ManagedAttributeField(afterSet="afterSetRole")
- private volatile String _role;
+ private volatile String _role = ReplicatedEnvironment.State.UNKNOWN.name();
private final AtomicReference<State> _state;
private final boolean _isMonitor;
private boolean _detached;
+ private BDBHAVirtualHostNodeLogSubject _virtualHostNodeLogSubject;
+ private GroupLogSubject _groupLogSubject;
public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode<?> virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
{
@@ -92,7 +102,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
@Override
public String getRole()
{
- return _role;
+ return _lastReplicatedEnvironmentState;
}
@Override
@@ -152,10 +162,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
{
String nodeName = getName();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting node '" + nodeName + "' from group '" + getGroupName() + "'");
- }
+ getEventLogger().message(_virtualHostNodeLogSubject, HighAvailabilityMessages.DELETED());
try
{
@@ -178,19 +185,11 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
try
{
String nodeName = getName();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Trying to transfer master to '" + nodeName + "'");
- }
+ getEventLogger().message(_groupLogSubject, HighAvailabilityMessages.TRANSFER_MASTER(getName(), getAddress()));
_replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("The transfer of mastership to node '" + nodeName + "' has been initiated.");
- }
}
- catch(Exception e)
+ catch (Exception e)
{
throw new IllegalConfigurationException("Cannot transfer mastership to '" + getName() + "'", e);
}
@@ -227,10 +226,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
void setRole(String role)
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug(this + " updating role to : " + role);
- }
+ _lastReplicatedEnvironmentState = role;
_role = role;
updateModelStateFromRole(role);
}
@@ -266,4 +262,17 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
{
this._detached = detached;
}
+
+ @Override
+ public void onValidate()
+ {
+ super.onValidate();
+ _virtualHostNodeLogSubject = new BDBHAVirtualHostNodeLogSubject(getGroupName(), getName());
+ _groupLogSubject = new GroupLogSubject(getGroupName());
+ }
+
+ private EventLogger getEventLogger()
+ {
+ return ((SystemConfig)getParent(VirtualHostNode.class).getParent(Broker.class).getParent(SystemConfig.class)).getEventLogger();
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 5489493f74..fd098a8ef6 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,6 +50,8 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
+import org.apache.qpid.server.logging.subjects.BDBHAVirtualHostNodeLogSubject;
+import org.apache.qpid.server.logging.subjects.GroupLogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -75,6 +78,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
{
public static final String VIRTUAL_HOST_NODE_TYPE = "BDB_HA";
+ public static final String VIRTUAL_HOST_PRINCIPAL_NAME_FORMAT = "grp(/{0})/vhn(/{1})";
/**
* Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a
@@ -87,6 +91,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>();
private final AtomicReference<ReplicatedEnvironment.State> _lastReplicatedEnvironmentState = new AtomicReference<>(ReplicatedEnvironment.State.UNKNOWN);
+ private BDBHAVirtualHostNodeLogSubject _virtualHostNodeLogSubject;
+ private GroupLogSubject _groupLogSubject;
+ private String _virtualHostNodePrincipalName;
@ManagedAttributeField
private String _storePath;
@@ -267,7 +274,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ADDED(getName(), getGroupName()));
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.CREATED());
}
protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
@@ -291,8 +298,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
getConfigurationStore().openConfigurationStore(this, false);
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ATTACHED(getName(), getGroupName(), getRole()));
-
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED());
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath()));
@@ -325,23 +330,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
ReplicatedEnvironment.State detached = ReplicatedEnvironment.State.DETACHED;
_lastReplicatedEnvironmentState.set(detached);
attributeSet(ROLE, _role, detached);
-
- //Perhaps, having STOPPED operational logging could be sufficient. However, on START we still will be seeing 2 logs: ATTACHED and STARTED
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName()));
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.STOPPED(getName(), getGroupName()));
- }
- }
-
- @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- protected void doActivate()
- {
- try
- {
- super.doActivate();
- }
- finally
- {
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.STARTED(getName(), getGroupName()));
}
}
@@ -359,7 +347,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
super.doDelete();
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED(getName(), getGroupName()));
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
if (getState() == State.DELETED && !helpers.isEmpty())
{
try
@@ -413,10 +401,18 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
finally
{
closeEnvironment();
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName()));
}
}
+ @Override
+ public void onValidate()
+ {
+ super.onValidate();
+ _virtualHostNodeLogSubject = new BDBHAVirtualHostNodeLogSubject(getGroupName(), getName());
+ _groupLogSubject = new GroupLogSubject(getGroupName());
+ _virtualHostNodePrincipalName = MessageFormat.format(VIRTUAL_HOST_PRINCIPAL_NAME_FORMAT, getGroupName(), getName());
+ }
+
private void onMaster()
{
try
@@ -550,7 +546,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
LOGGER.info("Received BDB event indicating transition to state " + state);
}
-
+ String previousRole = getRole();
try
{
switch (state)
@@ -573,10 +569,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
finally
{
_lastReplicatedEnvironmentState.set(state);
- String previousRole = _role;
attributeSet(ROLE, _role, state.name());
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ROLE_CHANGED(getName(),
- getGroupName(), previousRole, state.name()));
+ getEventLogger().message(getGroupLogSubject(),
+ HighAvailabilityMessages.ROLE_CHANGED(getName(), getAddress(), previousRole, state.name()));
}
}
}
@@ -591,8 +586,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.PRIORITY_CHANGED(getName(),
- getGroupName(), String.valueOf(_priority)));
+ getEventLogger().message(getVirtualHostNodeLogSubject(),
+ HighAvailabilityMessages.PRIORITY_CHANGED(String.valueOf(_priority)));
}
catch (TimeoutException e)
{
@@ -619,8 +614,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED(getName(),
- getGroupName(), String.valueOf(_designatedPrimary)));
+ getEventLogger().message(getVirtualHostNodeLogSubject(),
+ HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED(String.valueOf(_designatedPrimary)));
}
catch (TimeoutException e)
{
@@ -647,8 +642,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED(getName(),
- getGroupName(), String.valueOf(_quorumOverride)));
+ getEventLogger().message(getVirtualHostNodeLogSubject(),
+ HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED(String.valueOf(_quorumOverride)));
}
catch (TimeoutException e)
{
@@ -674,8 +669,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.TRANSFER_MASTER(getName(), getAddress()));
environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.TRANSFER_MASTER(getName(), getName(), getGroupName()));
}
catch (TimeoutException e)
{
@@ -701,18 +696,27 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
return getAddress().equals(getHelperAddress());
}
+ BDBHAVirtualHostNodeLogSubject getVirtualHostNodeLogSubject()
+ {
+ return _virtualHostNodeLogSubject;
+ }
+
+ GroupLogSubject getGroupLogSubject()
+ {
+ return _groupLogSubject;
+ }
+
private class RemoteNodesDiscoverer implements ReplicationGroupListener
{
@Override
public void onReplicationNodeAddedToGroup(final ReplicationNode node)
{
- getTaskExecutor().submit(new Task<Void>()
+ getTaskExecutor().submit(new VirtualHostNodeGroupTask()
{
@Override
- public Void execute()
+ public void perform()
{
addRemoteReplicationNode(node);
- return null;
}
});
}
@@ -722,19 +726,18 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade());
remoteNode.create();
childAdded(remoteNode);
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ADDED(remoteNode.getName(), getGroupName()));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.ADDED(remoteNode.getName(), remoteNode.getAddress()));
}
@Override
public void onReplicationNodeRecovered(final ReplicationNode node)
{
- getTaskExecutor().submit(new Task<Void>()
+ getTaskExecutor().submit(new VirtualHostNodeGroupTask()
{
@Override
- public Void execute()
+ public void perform()
{
recoverRemoteReplicationNode(node);
- return null;
}
});
}
@@ -745,19 +748,18 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
remoteNode.registerWithParents();
remoteNode.open();
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ATTACHED(remoteNode.getName(), getGroupName(), String.valueOf(remoteNode.getState())));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.JOINED(remoteNode.getName(), remoteNode.getAddress()));
}
@Override
public void onReplicationNodeRemovedFromGroup(final ReplicationNode node)
{
- getTaskExecutor().submit(new Task<Void>()
+ getTaskExecutor().submit(new VirtualHostNodeGroupTask()
{
@Override
- public Void execute()
+ public void perform()
{
removeRemoteReplicationNode(node);
- return null;
}
});
}
@@ -768,12 +770,25 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
if (remoteNode != null)
{
remoteNode.deleted();
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED(remoteNode.getName(), getGroupName()));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.REMOVED(remoteNode.getName(), remoteNode.getAddress()));
}
}
@Override
- public void onNodeState(ReplicationNode node, NodeState nodeState)
+ public void onNodeState(final ReplicationNode node, final NodeState nodeState)
+ {
+ Subject.doAs(SecurityManager.getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ processNodeState(node, nodeState);
+ return null;
+ }
+ });
+ }
+
+ private void processNodeState(ReplicationNode node, NodeState nodeState)
{
BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName());
if (remoteNode != null)
@@ -785,7 +800,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
remoteNode.setLastTransactionId(-1);
if (!remoteNode.isDetached())
{
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(remoteNode.getName(), getGroupName()));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.LEFT(remoteNode.getName(), remoteNode.getAddress()));
remoteNode.setDetached(true);
}
}
@@ -796,7 +811,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
remoteNode.setRole(nodeState.getNodeState().name());
if (remoteNode.isDetached())
{
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ATTACHED(remoteNode.getName(), getGroupName(), remoteNode.getRole() ));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.JOINED(remoteNode.getName(), remoteNode.getAddress()));
remoteNode.setDetached(false);
}
}
@@ -804,17 +819,29 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
String newRole = remoteNode.getRole();
if (!newRole.equals(currentRole))
{
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.ROLE_CHANGED(remoteNode.getName(),
- getGroupName(), currentRole, newRole));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.ROLE_CHANGED(remoteNode.getName(), remoteNode.getAddress(), currentRole, newRole));
}
}
}
@Override
- public void onIntruderNode(ReplicationNode node)
+ public void onIntruderNode(final ReplicationNode node)
+ {
+ Subject.doAs(SecurityManager.getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ processIntruderNode(node);
+ return null;
+ }
+ });
+ }
+
+ private void processIntruderNode(ReplicationNode node)
{
String hostAndPort = node.getHostName() + ":" + node.getPort();
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), hostAndPort, getGroupName()));
+ getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), hostAndPort));
boolean inManagementMode = getParent(Broker.class).isManagementMode();
if (inManagementMode)
@@ -858,7 +885,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@Override
public void onNoMajority()
{
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.MAJORITY_LOST(getName(), getGroupName()));
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.QUORUM_LOST());
}
private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode)
@@ -872,4 +899,23 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
+ private abstract class VirtualHostNodeGroupTask implements Task<Void>
+ {
+ @Override
+ public Void execute()
+ {
+ return Subject.doAs(SecurityManager.getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ perform();
+ return null;
+ }
+ });
+ }
+
+ abstract void perform();
+ }
+
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java
index 6259b49d61..0d64d87aef 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.server.virtualhostnode.berkeleydb;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -69,7 +71,7 @@ public class BDBHARemoteReplicationNodeTest extends QpidTestCase
// Virtualhost needs the EventLogger from the SystemContext.
when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker);
-
+ doReturn(VirtualHostNode.class).when(_virtualHostNode).getCategoryClass();
ConfiguredObjectFactory objectFactory = _broker.getObjectFactory();
when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel());
when(_virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor);
@@ -80,7 +82,7 @@ public class BDBHARemoteReplicationNodeTest extends QpidTestCase
String remoteReplicationName = getName();
BDBHARemoteReplicationNode remoteReplicationNode = createRemoteReplicationNode(remoteReplicationName);
- remoteReplicationNode.setAttribute(BDBHARemoteReplicationNode.ROLE, null, "MASTER");
+ remoteReplicationNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "UNKNOWN", "MASTER");
verify(_facade).transferMasterAsynchronously(remoteReplicationName);
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java
index ef1021160c..ea7d74090d 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeOperationalLoggingTest.java
@@ -78,67 +78,18 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
_helper.assertNodeRole(node1, "MASTER");
- String expectedMessage = HighAvailabilityMessages.ADDED(node1.getName(), node1.getGroupName()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY)));
-
- expectedMessage = HighAvailabilityMessages.ATTACHED(node1.getName(), node1.getGroupName(), "UNKNOWN").toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ATTACHED_LOG_HIERARCHY)));
+ assertEquals("Unexpected VHN log subject", "[grp(/group)/vhn(/node1)] ", node1.getVirtualHostNodeLogSubject().getLogString());
+ assertEquals("Unexpected group log subject", "[grp(/group)] ", node1.getGroupLogSubject().getLogString());
-
- expectedMessage = HighAvailabilityMessages.STARTED(node1.getName(), node1.getGroupName()).toString();
+ String expectedMessage = HighAvailabilityMessages.CREATED().toString();
verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.STARTED_LOG_HIERARCHY)));
+ argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.CREATED_LOG_HIERARCHY)));
- expectedMessage = HighAvailabilityMessages.ROLE_CHANGED(node1.getName(), node1.getGroupName(), "UNKNOWN", "MASTER").toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
+ expectedMessage = HighAvailabilityMessages.ROLE_CHANGED(node1.getName(), node1.getAddress(), "UNKNOWN", "MASTER").toString();
+ verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ROLE_CHANGED_LOG_HIERARCHY)));
}
- public void testStop() throws Exception
- {
- int node1PortNumber = findFreePort();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, "MASTER");
- reset(_eventLogger);
-
- node1.stop();
-
- String expectedMessage = HighAvailabilityMessages.DETACHED(node1.getName(), node1.getGroupName()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY)));
-
- expectedMessage = HighAvailabilityMessages.STOPPED(node1.getName(), node1.getGroupName()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.STOPPED_LOG_HIERARCHY)));
- }
-
- public void testClose() throws Exception
- {
- int node1PortNumber = findFreePort();
- String helperAddress = "localhost:" + node1PortNumber;
- String groupName = "group";
- String nodeName = "node1";
-
- Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
- BDBHAVirtualHostNodeImpl node1 = (BDBHAVirtualHostNodeImpl)_helper.createHaVHN(node1Attributes);
- _helper.assertNodeRole(node1, "MASTER");
-
- reset(_eventLogger);
-
- node1.close();
-
- String expectedMessage = HighAvailabilityMessages.DETACHED(node1.getName(), node1.getGroupName()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY)));
- }
-
public void testDelete() throws Exception
{
int node1PortNumber = findFreePort();
@@ -154,13 +105,10 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
node1.delete();
- String expectedMessage = HighAvailabilityMessages.DETACHED(node1.getName(), node1.getGroupName()).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY)));
-
- expectedMessage = HighAvailabilityMessages.DELETED(node1.getName(), node1.getGroupName()).toString();
+ String expectedMessage = HighAvailabilityMessages.DELETED().toString();
verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DELETED_LOG_HIERARCHY)));
+
}
public void testSetPriority() throws Exception
@@ -181,7 +129,7 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
// make sure that task executor thread finishes all scheduled tasks
node1.stop();
- String expectedMessage = HighAvailabilityMessages.PRIORITY_CHANGED(node1.getName(), node1.getGroupName(), "10").toString();
+ String expectedMessage = HighAvailabilityMessages.PRIORITY_CHANGED("10").toString();
verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.PRIORITY_CHANGED_LOG_HIERARCHY)));
}
@@ -204,7 +152,7 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
// make sure that task executor thread finishes all scheduled tasks
node1.stop();
- String expectedMessage = HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED(node1.getName(), node1.getGroupName(), "1").toString();
+ String expectedMessage = HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED("1").toString();
verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY)));
}
@@ -227,7 +175,7 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
// make sure that task executor thread finishes all scheduled tasks
node1.stop();
- String expectedMessage = HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED(node1.getName(), node1.getGroupName(), "true").toString();
+ String expectedMessage = HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED("true").toString();
verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DESIGNATED_PRIMARY_CHANGED_LOG_HIERARCHY)));
}
@@ -254,14 +202,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
// make sure that task executor thread finishes all scheduled tasks
node2.stop();
- // Verify ADDED message from node2 when its created
- String expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), groupName).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node2.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY)));
-
// Verify ADDED message from node1 when it discovers node2 has been added
- expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), groupName).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
+ String expectedMessage = HighAvailabilityMessages.ADDED(node2.getName(), node2.getAddress()).toString();
+ verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.ADDED_LOG_HIERARCHY)));
}
@@ -292,9 +235,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
// make sure that task executor thread finishes all scheduled tasks
node1.stop();
- String expectedMessage = HighAvailabilityMessages.DELETED(node2.getName(), groupName).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DELETED_LOG_HIERARCHY)));
+ String expectedMessage = HighAvailabilityMessages.REMOVED(node2.getName(), node2.getAddress()).toString();
+ verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
+ argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.REMOVED_LOG_HIERARCHY)));
}
public void testRemoteNodeDetached() throws Exception
@@ -324,9 +267,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
waitForNodeDetachedField(remoteNode, true);
// verify that remaining node issues the DETACHED operational logging for remote node
- String expectedMessage = HighAvailabilityMessages.DETACHED(node2.getName(), groupName).toString();
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.DETACHED_LOG_HIERARCHY)));
+ String expectedMessage = HighAvailabilityMessages.LEFT(node2.getName(), node2.getAddress()).toString();
+ verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
+ argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.LEFT_LOG_HIERARCHY)));
}
@@ -361,48 +304,9 @@ public class BDBHAVirtualHostNodeOperationalLoggingTest extends QpidTestCase
_helper.assertNodeRole(node2, "REPLICA", "MASTER");
waitForNodeDetachedField(remoteNode, false);
- final String expectedMessage = HighAvailabilityMessages.ATTACHED(node2.getName(), groupName, "REPLICA").toString();
- final String expectedMessage2 = HighAvailabilityMessages.ATTACHED(node2.getName(), groupName, "UNKNOWN").toString();
- final String expectedMessage3 = HighAvailabilityMessages.ATTACHED(node2.getName(), groupName, "MASTER").toString();
- ArgumentMatcher<LogMessage> matcher = new ArgumentMatcher<LogMessage>()
- {
- private String _messageErrorDescription = null;
- private String _hierarchyErrorDescription = null;
-
- @Override
- public boolean matches(Object argument)
- {
- LogMessage logMessage = (LogMessage)argument;
- String actualMessage = logMessage.toString();
- boolean expectedMessageMatches = expectedMessage.equals(actualMessage)
- || expectedMessage2.equals(actualMessage) || expectedMessage3.equals(actualMessage);
- if (!expectedMessageMatches)
- {
- _messageErrorDescription = "Actual message does not match any expected: " + actualMessage;
- }
- boolean expectedHierarchyMatches = HighAvailabilityMessages.ATTACHED_LOG_HIERARCHY.equals(logMessage.getLogHierarchy());
- if (!expectedHierarchyMatches)
- {
- _hierarchyErrorDescription = "Actual hierarchy does not match expected: " + logMessage.getLogHierarchy();
- }
- return expectedMessageMatches && expectedHierarchyMatches;
- }
-
- @Override
- public void describeTo(Description description)
- {
- if (_messageErrorDescription != null)
- {
- description.appendText(_messageErrorDescription);
- }
- if (_hierarchyErrorDescription != null)
- {
- description.appendText(_hierarchyErrorDescription);
- }
- }
- };
- verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getVirtualHostNodeLogSubject())),
- argThat(matcher));
+ final String expectedMessage = HighAvailabilityMessages.JOINED(node2.getName(), node2.getAddress()).toString();
+ verify(_eventLogger).message(argThat(new LogSubjectMatcher(node1.getGroupLogSubject())),
+ argThat(new LogMessageMatcher(expectedMessage, HighAvailabilityMessages.JOINED_LOG_HIERARCHY)));
}
private void waitForNodeDetachedField(BDBHARemoteReplicationNodeImpl remoteNode, boolean expectedDetached) throws InterruptedException {
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
index e78ef34759..f7dce4f3f5 100644
--- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
@@ -75,9 +75,9 @@ public class GroupCreator
private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
- private static final int FAILOVER_CYCLECOUNT = 10;
- private static final int FAILOVER_RETRIES = 1;
- private static final int FAILOVER_CONNECTDELAY = 1000;
+ private static final int FAILOVER_CYCLECOUNT = 20;
+ private static final int FAILOVER_RETRIES = 0;
+ private static final int FAILOVER_CONNECTDELAY = 500;
private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java
index c6f005c0e7..63de287be7 100644
--- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java
@@ -206,20 +206,6 @@ public class JMXManagementTest extends QpidBrokerTestCase
}
}
- public void testSetDesignatedPrimary() throws Exception
- {
- int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next();
- final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort);
- assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary());
- storeBean.setDesignatedPrimary(true);
- long limit = System.currentTimeMillis() + 5000;
- while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
- {
- Thread.sleep(100l);
- }
- assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary());
- }
-
public void testVirtualHostMbeanOnMasterTransfer() throws Exception
{
Connection connection = getConnection(_brokerFailoverUrl);
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
index 0f8a1609de..248dbb4def 100644
--- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
@@ -20,27 +20,29 @@
package org.apache.qpid.server.store.berkeleydb.replication;
import java.io.File;
+import java.util.Collections;
+import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
-import javax.management.ObjectName;
import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
-import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class TwoNodeTest extends QpidBrokerTestCase
{
private static final String VIRTUAL_HOST = "test";
- private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
private static final int NUMBER_OF_NODES = 2;
private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
- private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
- private ConnectionURL _brokerFailoverUrl;
+ /** Used when expectation is client will not (re)-connect */
+ private ConnectionURL _positiveFailoverUrl;
+
+ /** Used when expectation is client will not (re)-connect */
+ private ConnectionURL _negativeFailoverUrl;
@Override
protected void setUp() throws Exception
@@ -54,19 +56,6 @@ public class TwoNodeTest extends QpidBrokerTestCase
}
@Override
- protected void tearDown() throws Exception
- {
- try
- {
- _jmxUtils.close();
- }
- finally
- {
- super.tearDown();
- }
- }
-
- @Override
public void startBroker() throws Exception
{
// Don't start default broker provided by QBTC.
@@ -77,20 +66,21 @@ public class TwoNodeTest extends QpidBrokerTestCase
setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
_groupCreator.configureClusterNodes();
_groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
- _brokerFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes();
+ _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes();
+ _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2);
_groupCreator.startCluster();
}
public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
{
startCluster(true);
- final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ final Connection initialConnection = getConnection(_positiveFailoverUrl);
int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection);
assertProducingConsuming(initialConnection);
initialConnection.close();
_groupCreator.stopCluster();
_groupCreator.startNode(masterPort);
- final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ final Connection secondConnection = getConnection(_positiveFailoverUrl);
assertProducingConsuming(secondConnection);
secondConnection.close();
}
@@ -98,12 +88,12 @@ public class TwoNodeTest extends QpidBrokerTestCase
public void testClusterRestartWithoutDesignatedPrimary() throws Exception
{
startCluster(false);
- final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ final Connection initialConnection = getConnection(_positiveFailoverUrl);
assertProducingConsuming(initialConnection);
initialConnection.close();
_groupCreator.stopCluster();
_groupCreator.startClusterParallel();
- final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ final Connection secondConnection = getConnection(_positiveFailoverUrl);
assertProducingConsuming(secondConnection);
secondConnection.close();
}
@@ -112,7 +102,7 @@ public class TwoNodeTest extends QpidBrokerTestCase
{
startCluster(true);
_groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
- final Connection connection = getConnection(_brokerFailoverUrl);
+ final Connection connection = getConnection(_positiveFailoverUrl);
assertNotNull("Expected to get a valid connection to primary", connection);
assertProducingConsuming(connection);
}
@@ -124,7 +114,7 @@ public class TwoNodeTest extends QpidBrokerTestCase
try
{
- Connection connection = getConnection(_brokerFailoverUrl);
+ Connection connection = getConnection(_negativeFailoverUrl);
assertProducingConsuming(connection);
fail("Exception not thrown");
}
@@ -143,7 +133,7 @@ public class TwoNodeTest extends QpidBrokerTestCase
try
{
- getConnection(_brokerFailoverUrl);
+ getConnection(_negativeFailoverUrl);
fail("Connection not expected");
}
catch (JMSException e)
@@ -155,41 +145,39 @@ public class TwoNodeTest extends QpidBrokerTestCase
public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
{
startCluster(true);
- final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfPrimary());
- assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary());
- final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode());
- assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary());
+ Map<String, Object> primaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary());
+ assertTrue("Expected primary node to be set as designated primary",
+ (Boolean) primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+
+ Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected secondary node to NOT be set as designated primary",
+ (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
}
public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception
{
startCluster(true);
- final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+
_groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
- assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
- storeBean.setDesignatedPrimary(true);
+ Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+
+ _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
- long limit = System.currentTimeMillis() + 5000;
- while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
+ int timeout = 5000;
+ long limit = System.currentTimeMillis() + timeout;
+ while( !((Boolean)secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)) && System.currentTimeMillis() < limit)
{
Thread.sleep(100);
+ secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
}
- assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
+ assertTrue("Expected secondary to transition to primary within " + timeout, (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
- final Connection connection = getConnection(_brokerFailoverUrl);
+ final Connection connection = getConnection(_positiveFailoverUrl);
assertNotNull("Expected to get a valid connection to new primary", connection);
assertProducingConsuming(connection);
}
- private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
- final int activeBrokerPortNumber) throws Exception
- {
- _jmxUtils.open(activeBrokerPortNumber);
-
- ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
- return storeBean;
- }
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index fa4e3f21dd..597fc44e4c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -281,4 +281,10 @@ class HeadersBinding
return true;
}
+
+ @Override
+ public int hashCode()
+ {
+ return _binding == null ? 0 : _binding.hashCode();
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java
index 9e497efcd2..b864a8c095 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailabilityMessages.java
@@ -44,15 +44,15 @@ public class HighAvailabilityMessages
private static Locale _currentLocale = BrokerProperties.getLocale();
public static final String HIGHAVAILABILITY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability";
- public static final String STOPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.stopped";
public static final String INTRUDER_DETECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.intruder_detected";
- public static final String STARTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.started";
public static final String TRANSFER_MASTER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.transfer_master";
public static final String QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.quorum_override_changed";
- public static final String DETACHED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.detached";
- public static final String MAJORITY_LOST_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.majority_lost";
+ public static final String REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.removed";
+ public static final String LEFT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.left";
+ public static final String JOINED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.joined";
+ public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.created";
+ public static final String QUORUM_LOST_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.quorum_lost";
public static final String PRIORITY_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.priority_changed";
- public static final String ATTACHED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.attached";
public static final String ADDED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.added";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.deleted";
public static final String ROLE_CHANGED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "highavailability.role_changed";
@@ -61,15 +61,15 @@ public class HighAvailabilityMessages
static
{
Logger.getLogger(HIGHAVAILABILITY_LOG_HIERARCHY);
- Logger.getLogger(STOPPED_LOG_HIERARCHY);
Logger.getLogger(INTRUDER_DETECTED_LOG_HIERARCHY);
- Logger.getLogger(STARTED_LOG_HIERARCHY);
Logger.getLogger(TRANSFER_MASTER_LOG_HIERARCHY);
Logger.getLogger(QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY);
- Logger.getLogger(DETACHED_LOG_HIERARCHY);
- Logger.getLogger(MAJORITY_LOST_LOG_HIERARCHY);
+ Logger.getLogger(REMOVED_LOG_HIERARCHY);
+ Logger.getLogger(LEFT_LOG_HIERARCHY);
+ Logger.getLogger(JOINED_LOG_HIERARCHY);
+ Logger.getLogger(CREATED_LOG_HIERARCHY);
+ Logger.getLogger(QUORUM_LOST_LOG_HIERARCHY);
Logger.getLogger(PRIORITY_CHANGED_LOG_HIERARCHY);
- Logger.getLogger(ATTACHED_LOG_HIERARCHY);
Logger.getLogger(ADDED_LOG_HIERARCHY);
Logger.getLogger(DELETED_LOG_HIERARCHY);
Logger.getLogger(ROLE_CHANGED_LOG_HIERARCHY);
@@ -80,14 +80,14 @@ public class HighAvailabilityMessages
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1012 : The node ''{0}'' from the replication group ''{1}'' is stopped.</pre>
+ * <pre>HA-1008 : Intruder detected : Node ''{0}'' ({1})</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage STOPPED(String param1, String param2)
+ public static LogMessage INTRUDER_DETECTED(String param1, String param2)
{
- String rawMessage = _messages.getString("STOPPED");
+ String rawMessage = _messages.getString("INTRUDER_DETECTED");
final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
@@ -105,23 +105,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return STOPPED_LOG_HIERARCHY;
+ return INTRUDER_DETECTED_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1007: Intruder node ''{0}'' from ''{1}'' is detected in replication group ''{2}''</pre>
+ * <pre>HA-1007 : Master transfer requested : to ''{0}'' ({1})</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage INTRUDER_DETECTED(String param1, String param2, String param3)
+ public static LogMessage TRANSFER_MASTER(String param1, String param2)
{
- String rawMessage = _messages.getString("INTRUDER_DETECTED");
+ String rawMessage = _messages.getString("TRANSFER_MASTER");
- final Object[] messageArguments = {param1, param2, param3};
+ final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -137,23 +137,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return INTRUDER_DETECTED_LOG_HIERARCHY;
+ return TRANSFER_MASTER_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1013 : The node ''{0}'' from the replication group ''{1}'' is started.</pre>
+ * <pre>HA-1011 : Minimum group : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage STARTED(String param1, String param2)
+ public static LogMessage QUORUM_OVERRIDE_CHANGED(String param1)
{
- String rawMessage = _messages.getString("STARTED");
+ String rawMessage = _messages.getString("QUORUM_OVERRIDE_CHANGED");
- final Object[] messageArguments = {param1, param2};
+ final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -169,23 +169,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return STARTED_LOG_HIERARCHY;
+ return QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1014 : Transfer master to ''{0}'' is requested on node ''{1}'' from the replication group ''{2}''.</pre>
+ * <pre>HA-1004 : Removed : Node : ''{0}'' ({1})</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage TRANSFER_MASTER(String param1, String param2, String param3)
+ public static LogMessage REMOVED(String param1, String param2)
{
- String rawMessage = _messages.getString("TRANSFER_MASTER");
+ String rawMessage = _messages.getString("REMOVED");
- final Object[] messageArguments = {param1, param2, param3};
+ final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -201,23 +201,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return TRANSFER_MASTER_LOG_HIERARCHY;
+ return REMOVED_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1009 : The quorum override attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.</pre>
+ * <pre>HA-1006 : Left : Node : ''{0}'' ({1})</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage QUORUM_OVERRIDE_CHANGED(String param1, String param2, String param3)
+ public static LogMessage LEFT(String param1, String param2)
{
- String rawMessage = _messages.getString("QUORUM_OVERRIDE_CHANGED");
+ String rawMessage = _messages.getString("LEFT");
- final Object[] messageArguments = {param1, param2, param3};
+ final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -233,21 +233,21 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return QUORUM_OVERRIDE_CHANGED_LOG_HIERARCHY;
+ return LEFT_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1003 : The node ''{0}'' detached from the replication group ''{1}''.</pre>
+ * <pre>HA-1005 : Joined : Node : ''{0}'' ({1})</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage DETACHED(String param1, String param2)
+ public static LogMessage JOINED(String param1, String param2)
{
- String rawMessage = _messages.getString("DETACHED");
+ String rawMessage = _messages.getString("JOINED");
final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
@@ -265,28 +265,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return DETACHED_LOG_HIERARCHY;
+ return JOINED_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1006 : A majority of nodes from replication group ''{0}'' is not available for node ''{1}''.</pre>
+ * <pre>HA-1001 : Created</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage MAJORITY_LOST(String param1, String param2)
+ public static LogMessage CREATED()
{
- String rawMessage = _messages.getString("MAJORITY_LOST");
+ String rawMessage = _messages.getString("CREATED");
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
+ final String message = rawMessage;
return new LogMessage()
{
@@ -297,28 +292,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return MAJORITY_LOST_LOG_HIERARCHY;
+ return CREATED_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1008 : The priority attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.</pre>
+ * <pre>HA-1009 : Insufficient replicas contactable</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage PRIORITY_CHANGED(String param1, String param2, String param3)
+ public static LogMessage QUORUM_LOST()
{
- String rawMessage = _messages.getString("PRIORITY_CHANGED");
+ String rawMessage = _messages.getString("QUORUM_LOST");
- final Object[] messageArguments = {param1, param2, param3};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
+ final String message = rawMessage;
return new LogMessage()
{
@@ -329,23 +319,23 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return PRIORITY_CHANGED_LOG_HIERARCHY;
+ return QUORUM_LOST_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1004 : The node ''{0}'' attached to the replication group ''{1}'' with role ''{2}''.</pre>
+ * <pre>HA-1012 : Priority : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage ATTACHED(String param1, String param2, String param3)
+ public static LogMessage PRIORITY_CHANGED(String param1)
{
- String rawMessage = _messages.getString("ATTACHED");
+ String rawMessage = _messages.getString("PRIORITY_CHANGED");
- final Object[] messageArguments = {param1, param2, param3};
+ final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -361,14 +351,14 @@ public class HighAvailabilityMessages
public String getLogHierarchy()
{
- return ATTACHED_LOG_HIERARCHY;
+ return PRIORITY_CHANGED_LOG_HIERARCHY;
}
};
}
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1001 : A new node ''{0}'' is added into a replication group ''{1}''.</pre>
+ * <pre>HA-1003 : Added : Node : ''{0}'' ({1})</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
@@ -400,21 +390,16 @@ public class HighAvailabilityMessages
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1002 : An existing node ''{0}'' is removed from the replication group ''{1}''.</pre>
+ * <pre>HA-1002 : Deleted</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage DELETED(String param1, String param2)
+ public static LogMessage DELETED()
{
String rawMessage = _messages.getString("DELETED");
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
+ final String message = rawMessage;
return new LogMessage()
{
@@ -432,7 +417,7 @@ public class HighAvailabilityMessages
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1005 : The role of the node ''{0}'' from the replication group ''{1}'' has changed from ''{2}'' to ''{3}''.</pre>
+ * <pre>HA-1010 : Role change reported: Node : ''{0}'' ({1}) : from ''{2}'' to ''{3}''</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
@@ -464,16 +449,16 @@ public class HighAvailabilityMessages
/**
* Log a HighAvailability message of the Format:
- * <pre>HA-1010 : The designated primary attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.</pre>
+ * <pre>HA-1013 : Designated primary : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage DESIGNATED_PRIMARY_CHANGED(String param1, String param2, String param3)
+ public static LogMessage DESIGNATED_PRIMARY_CHANGED(String param1)
{
String rawMessage = _messages.getString("DESIGNATED_PRIMARY_CHANGED");
- final Object[] messageArguments = {param1, param2, param3};
+ final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties
index 94df7cc38b..3c5b0d260f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/HighAvailability_logmessages.properties
@@ -18,18 +18,47 @@
#
# HA logging message i18n strings.
-ADDED=HA-1001 : A new node ''{0}'' is added into a replication group ''{1}''.
-DELETED=HA-1002 : An existing node ''{0}'' is removed from the replication group ''{1}''.
-DETACHED=HA-1003 : The node ''{0}'' detached from the replication group ''{1}''.
-ATTACHED=HA-1004 : The node ''{0}'' attached to the replication group ''{1}'' with role ''{2}''.
-ROLE_CHANGED=HA-1005 : The role of the node ''{0}'' from the replication group ''{1}'' has changed from ''{2}'' to ''{3}''.
-MAJORITY_LOST=HA-1006 : A majority of nodes from replication group ''{0}'' is not available for node ''{1}''.
-INTRUDER_DETECTED=HA-1007: Intruder node ''{0}'' from ''{1}'' is detected in replication group ''{2}''
-PRIORITY_CHANGED=HA-1008 : The priority attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.
-QUORUM_OVERRIDE_CHANGED=HA-1009 : The quorum override attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.
-DESIGNATED_PRIMARY_CHANGED=HA-1010 : The designated primary attribute of node ''{0}'' from the replication group ''{1}'' is set to ''{2}''.
-STOPPED=HA-1011 : The node ''{0}'' from the replication group ''{1}'' is stopped.
-STARTED=HA-1012 : The node ''{0}'' from the replication group ''{1}'' is started.
-TRANSFER_MASTER=HA-1013 : Transfer master to ''{0}'' is requested on node ''{1}'' from the replication group ''{2}''.
+CREATED = HA-1001 : Created
+DELETED = HA-1002 : Deleted
+
+# 0 - Node name
+# 1 - Node address
+ADDED = HA-1003 : Added : Node : ''{0}'' ({1})
+
+# 0 - Node name
+# 1 - Node address
+REMOVED = HA-1004 : Removed : Node : ''{0}'' ({1})
+
+# 0 - Node name
+# 1 - Node address
+JOINED = HA-1005 : Joined : Node : ''{0}'' ({1})
+
+# 0 - Node name
+# 1 - Node address
+LEFT = HA-1006 : Left : Node : ''{0}'' ({1})
+
+# 0 - Node name
+# 1 - Node address
+TRANSFER_MASTER = HA-1007 : Master transfer requested : to ''{0}'' ({1})
+
+# 0 - Node name
+# 1 - Node address
+INTRUDER_DETECTED = HA-1008 : Intruder detected : Node ''{0}'' ({1})
+QUORUM_LOST = HA-1009 : Insufficient replicas contactable
+
+# 0 - Node name
+# 1 - Node address
+# 2 - Previous role value
+# 3 - New role value
+ROLE_CHANGED = HA-1010 : Role change reported: Node : ''{0}'' ({1}) : from ''{2}'' to ''{3}''
+
+# 0 - new value
+QUORUM_OVERRIDE_CHANGED = HA-1011 : Minimum group : {0}
+
+# 0 - new value
+PRIORITY_CHANGED = HA-1012 : Priority : {0}
+
+# 0 - new value
+DESIGNATED_PRIMARY_CHANGED = HA-1013 : Designated primary : {0}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
index edb78369ae..d59a09fce9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java
@@ -116,5 +116,4 @@ public class LogSubjectFormat
*/
public static final String QUEUE_FORMAT = "vh(/{0})/qu({1})";
- public static final String VIRTUAL_HOST_NODE_FORMAT = "vhn(/{0}))";
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 6c8945582c..f944821c6f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -318,7 +318,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
checkCandidate((Class<? extends ConfiguredObject>) interfaceClass, candidates);
}
}
- if(clazz.getSuperclass() != null & ConfiguredObject.class.isAssignableFrom(clazz.getSuperclass()))
+ if(clazz.getSuperclass() != null && ConfiguredObject.class.isAssignableFrom(clazz.getSuperclass()))
{
findBestFitInterface((Class<? extends ConfiguredObject>) clazz.getSuperclass(), candidates);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
index 0f4ecb09dc..b0dda69ee6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
@@ -186,6 +186,7 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(BrokerModel.getInstance());
Reader reader;
+
try
{
URL url = new URL(initialConfigurationLocation);
@@ -196,9 +197,18 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
reader = new FileReader(initialConfigurationLocation);
}
- Collection<ConfiguredObjectRecord> records = converter.readFromJson(org.apache.qpid.server.model.Broker.class,
- systemConfig, reader);
- return records.toArray(new ConfiguredObjectRecord[records.size()]);
+ try
+ {
+ Collection<ConfiguredObjectRecord> records =
+ converter.readFromJson(org.apache.qpid.server.model.Broker.class,
+ systemConfig, reader);
+ return records.toArray(new ConfiguredObjectRecord[records.size()]);
+ }
+ finally
+ {
+ reader.close();
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 545a1d941d..c49c2790df 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -2925,7 +2925,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(existingPolicy != _exclusive)
{
ExclusivityPolicy newPolicy = _exclusive;
- _exclusive = newPolicy;
+ _exclusive = existingPolicy;
updateExclusivityPolicy(newPolicy);
}
return true;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 57142e6e1f..d0acf1a46f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -1564,7 +1564,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
data = new byte[0];
}
}
- return ByteBuffer.wrap(data,offsetInMessage,size);
+ return ByteBuffer.wrap(data,offsetInMessage,Math.min(size,data.length-offsetInMessage));
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index 38101525cd..ad9df793c8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.logging.subjects.VirtualHostNodeLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -54,7 +53,6 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
private final Broker<?> _broker;
private final AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED);
private final EventLogger _eventLogger;
- private final VirtualHostNodeLogSubject _virtualHostNodeLogSubject;
private DurableConfigurationStore _durableConfigurationStore;
@@ -67,7 +65,6 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
_broker = parent;
SystemConfig<?> systemConfig = _broker.getParent(SystemConfig.class);
_eventLogger = systemConfig.getEventLogger();
- _virtualHostNodeLogSubject = new VirtualHostNodeLogSubject(getName());
}
@@ -248,8 +245,4 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
protected abstract void activate();
- public VirtualHostNodeLogSubject getVirtualHostNodeLogSubject()
- {
- return _virtualHostNodeLogSubject;
- }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 7877812d84..49bc26149e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -474,13 +474,12 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
else
{
- getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
- _currentMessage.getMessagePublishInfo().getRoutingKey()
- == null
- ? null
- : _currentMessage.getMessagePublishInfo()
- .getRoutingKey()
- .toString()));
+ AMQShortString exchangeName = _currentMessage.getExchangeName();
+ AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
+
+ getVirtualHost().getEventLogger().message(
+ ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
+ routingKey == null ? null : routingKey.asString()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 1bd9ab079e..c33af48d8e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -158,7 +158,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
acknowledged.add(instance);
}
}
- return ackedMessageMap.values();
+ return acknowledged;
}
private void collect(long key, Map<Long, MessageInstance> msgs)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
new file mode 100644
index 0000000000..ca52173e66
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.message.MessageInstance;
+
+public class UnacknowledgedMessageMapTest extends TestCase
+{
+ public void testDeletedMessagesCantBeAcknowledged()
+ {
+ UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100);
+ final int expectedSize = 5;
+ MessageInstance[] msgs = populateMap(map,expectedSize);
+ assertEquals(expectedSize,map.size());
+ Collection<MessageInstance> acknowledged = map.acknowledge(100, true);
+ assertEquals(expectedSize, acknowledged.size());
+ assertEquals(0,map.size());
+ for(int i = 0; i < expectedSize; i++)
+ {
+ assertTrue("Message " + i + " is missing", acknowledged.contains(msgs[i]));
+ }
+
+ map = new UnacknowledgedMessageMapImpl(100);
+ msgs = populateMap(map,expectedSize);
+ // simulate some messages being ttl expired
+ when(msgs[2].lockAcquisition()).thenReturn(Boolean.FALSE);
+ when(msgs[4].lockAcquisition()).thenReturn(Boolean.FALSE);
+
+ assertEquals(expectedSize,map.size());
+
+
+ acknowledged = map.acknowledge(100, true);
+ assertEquals(expectedSize-2, acknowledged.size());
+ assertEquals(0,map.size());
+ for(int i = 0; i < expectedSize; i++)
+ {
+ assertEquals(i != 2 && i != 4, acknowledged.contains(msgs[i]));
+ }
+
+ }
+
+ public MessageInstance[] populateMap(final UnacknowledgedMessageMap map, int size)
+ {
+ MessageInstance[] msgs = new MessageInstance[size];
+ for(int i = 0; i < size; i++)
+ {
+ msgs[i] = createMessageInstance(i);
+ map.add((long)i,msgs[i]);
+ }
+ return msgs;
+ }
+
+ private MessageInstance createMessageInstance(final int id)
+ {
+ MessageInstance instance = mock(MessageInstance.class);
+ when(instance.lockAcquisition()).thenReturn(Boolean.TRUE);
+ return instance;
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index 3974ab0af6..266f3b6868 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -20,10 +20,32 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.UnsignedShort;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
@@ -32,17 +54,6 @@ import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.typedmessage.TypedBytesContentWriter;
import org.apache.qpid.typedmessage.TypedBytesFormatException;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-
public class MessageConverter_from_1_0
{
private static final Charset UTF_8 = Charset.forName("UTF-8");
@@ -91,7 +102,7 @@ public class MessageConverter_from_1_0
Section firstBodySection = sections.get(0);
if(firstBodySection instanceof AmqpValue)
{
- bodyObject = fixObject(((AmqpValue)firstBodySection).getValue());
+ bodyObject = convertValue(((AmqpValue)firstBodySection).getValue());
}
else if(firstBodySection instanceof Data)
{
@@ -115,7 +126,7 @@ public class MessageConverter_from_1_0
{
totalSequence.addAll(((AmqpSequence)section).getValue());
}
- bodyObject = fixObject(totalSequence);
+ bodyObject = convertValue(totalSequence);
}
}
@@ -127,40 +138,94 @@ public class MessageConverter_from_1_0
return bodyObject;
}
- private static Object fixObject(final Object value)
+ private static final Set<Class> STANDARD_TYPES = new HashSet<>(Arrays.<Class>asList(Boolean.class,
+ Byte.class,
+ Short.class,
+ Integer.class,
+ Long.class,
+ Float.class,
+ Double.class,
+ Character.class,
+ String.class,
+ byte[].class,
+ UUID.class));
+
+ private static Map convertMap(final Map map)
{
- if(value instanceof Binary)
+ Map resultMap = new LinkedHashMap();
+ Iterator<Map.Entry> iterator = map.entrySet().iterator();
+ while(iterator.hasNext())
{
- final Binary binaryValue = (Binary) value;
- byte[] data = new byte[binaryValue.getLength()];
- binaryValue.asByteBuffer().get(data);
- return data;
+ Map.Entry entry = iterator.next();
+ resultMap.put(convertValue(entry.getKey()), convertValue(entry.getValue()));
+
}
- else if(value instanceof List)
+ return resultMap;
+ }
+
+ public static Object convertValue(final Object value)
+ {
+ if(value != null && !STANDARD_TYPES.contains(value))
{
- List listValue = (List) value;
- List fixedValue = new ArrayList(listValue.size());
- for(Object o : listValue)
+ if(value instanceof Map)
{
- fixedValue.add(fixObject(o));
+ return convertMap((Map)value);
}
- return fixedValue;
- }
- else if(value instanceof Map)
- {
- Map<?,?> mapValue = (Map) value;
- Map fixedValue = new LinkedHashMap(mapValue.size());
- for(Map.Entry<?,?> entry : mapValue.entrySet())
+ else if(value instanceof List)
+ {
+ return convertList((List)value);
+ }
+ else if(value instanceof UnsignedByte)
+ {
+ return ((UnsignedByte)value).shortValue();
+ }
+ else if(value instanceof UnsignedShort)
+ {
+ return ((UnsignedShort)value).intValue();
+ }
+ else if(value instanceof UnsignedInteger)
+ {
+ return ((UnsignedInteger)value).longValue();
+ }
+ else if(value instanceof UnsignedLong)
+ {
+ return ((UnsignedLong)value).longValue();
+ }
+ else if(value instanceof Symbol)
+ {
+ return value.toString();
+ }
+ else if(value instanceof Date)
{
- fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue()));
+ return ((Date)value).getTime();
+ }
+ else if(value instanceof Binary)
+ {
+ Binary binary = (Binary)value;
+ byte[] data = new byte[binary.getLength()];
+ binary.asByteBuffer().get(data);
+ return data;
+ }
+ else
+ {
+ // Throw exception instead?
+ return value.toString();
}
- return fixedValue;
}
else
{
return value;
}
+ }
+ private static List convertList(final List list)
+ {
+ List result = new ArrayList(list.size());
+ for(Object entry : list)
+ {
+ result.add(convertValue(entry));
+ }
+ return result;
}
public static byte[] convertToBody(Object object)
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 8d77a8cfaf..54d4638bb8 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageConverter;
@@ -176,7 +177,8 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
messageProps.setReplyTo(replyTo);
}
- messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeadersAsMap());
+ messageProps.setApplicationHeaders((Map<String, Object>) MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader()
+ .getHeadersAsMap()));
Header header = new Header(deliveryProps, messageProps, null);
return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 5b1c25e879..2de21e1a9f 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -194,7 +194,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
{
- headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName));
+ headerProps.put(headerName, MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader().getHeader(headerName)));
}
props.setHeaders(FieldTable.convertToFieldTable(headerProps));
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java
index d8f8e4e4b0..2f6f55e042 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/DefinedFileServlet.java
@@ -61,23 +61,24 @@ public class DefinedFileServlet extends HttpServlet
{
try (OutputStream output = HttpManagementUtil.getOutputStream(request, response))
{
- InputStream fileInput = getClass().getResourceAsStream("/resources/" + _filename);
-
- if (fileInput != null)
+ try(InputStream fileInput = getClass().getResourceAsStream("/resources/" + _filename))
{
- byte[] buffer = new byte[1024];
- response.setStatus(HttpServletResponse.SC_OK);
- int read = 0;
+ if (fileInput != null)
+ {
+ byte[] buffer = new byte[1024];
+ response.setStatus(HttpServletResponse.SC_OK);
+ int read = 0;
- while ((read = fileInput.read(buffer)) > 0)
+ while ((read = fileInput.read(buffer)) > 0)
+ {
+ output.write(buffer, 0, read);
+ }
+ }
+ else
{
- output.write(buffer, 0, read);
+ response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file: " + _filename);
}
}
- else
- {
- response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file: " + _filename);
- }
}
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 945645ccb1..35252204ac 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2101,7 +2101,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void deregisterProducer(long producerId)
{
- _producers.remove(new Long(producerId));
+ _producers.remove(producerId);
}
boolean isInRecovery()
@@ -2935,7 +2935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void registerProducer(long producerId, MessageProducer producer)
{
- _producers.put(new Long(producerId), producer);
+ _producers.put(producerId, producer);
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 3ff7416d8f..dbbc300910 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -26,6 +26,14 @@ import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTRO
import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,13 +60,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
@@ -736,10 +737,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
boolean isConsumer,
boolean noLocal) throws AMQException
{
- throw new UnsupportedOperationException("The new addressing based syntax is "
- + "not supported for AMQP 0-8/0-9 versions");
+ throw new UnsupportedAddressSyntaxException(dest);
}
+
protected void flushAcknowledgments()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 3cb723e5a8..b9bb03444f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.client;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -33,16 +44,6 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.Topic;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
@@ -57,6 +58,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
void declareDestination(AMQDestination destination)
{
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ throw new UnsupportedAddressSyntaxException(destination);
+ }
+
if(getSession().isDeclareExchanges())
{
final MethodRegistry methodRegistry = getSession().getMethodRegistry();
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/qpid/java/client/src/main/java/org/apache/qpid/client/UnsupportedAddressSyntaxException.java
index 1291380311..c65fd7c189 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/UnsupportedAddressSyntaxException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,35 +18,15 @@
* under the License.
*
*/
-package org.apache.qpid.server.util;
-
-import java.util.concurrent.Callable;
+package org.apache.qpid.client;
-public abstract class TimedRun implements Callable<Long>
+class UnsupportedAddressSyntaxException extends UnsupportedOperationException
{
- private final String description;
-
- public TimedRun(String description)
+ UnsupportedAddressSyntaxException(final AMQDestination dest)
{
- this.description = description;
+ super("The address '" + dest.toString() + "' uses the " + AMQDestination.DestSyntax.ADDR + " addressing syntax"
+ + " which is not supported for AMQP 0-8/0-9/0-9-1 connections. Use the " + AMQDestination.DestSyntax.BURL
+ + " syntax instead:\n"
+ + "\tBURL:<Exchange Class>://<Exchange Name>/[<Destination>]/[<Queue>][?<option>='<value>'[&<option>='<value>']]\n");
}
-
- public Long call() throws Exception
- {
- setup();
- long start = System.currentTimeMillis();
- run();
- long stop = System.currentTimeMillis();
- teardown();
- return stop - start;
- }
-
- public String toString()
- {
- return description;
- }
-
- protected void setup() throws Exception{}
- protected void teardown() throws Exception{}
- protected abstract void run() throws Exception;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index b039d8b005..808b2781c4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.jms;
-import org.apache.qpid.transport.ConnectionSettings;
-
import java.util.Map;
+import org.apache.qpid.transport.ConnectionSettings;
+
public interface BrokerDetails
{
@@ -104,9 +104,5 @@ public interface BrokerDetails
boolean getBooleanProperty(String propName);
- String toString();
-
- boolean equals(Object o);
-
ConnectionSettings buildConnectionSettings();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java
index 978ad3af7d..9d8509b303 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java
@@ -50,7 +50,7 @@ public class ConstantExpression implements Expression
public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
- private Object value;
+ private Object _value;
public static ConstantExpression createFromDecimal(String text)
{
@@ -64,7 +64,7 @@ public class ConstantExpression implements Expression
Number value;
try
{
- value = new Long(text);
+ value = Long.valueOf(text);
}
catch (NumberFormatException e)
{
@@ -114,17 +114,17 @@ public class ConstantExpression implements Expression
public ConstantExpression(Object value)
{
- this.value = value;
+ this._value = value;
}
public Object evaluate(FilterableMessage message)
{
- return value;
+ return _value;
}
public Object getValue()
{
- return value;
+ return _value;
}
/**
@@ -132,22 +132,22 @@ public class ConstantExpression implements Expression
*/
public String toString()
{
- if (value == null)
+ if (_value == null)
{
return "NULL";
}
- if (value instanceof Boolean)
+ if (_value instanceof Boolean)
{
- return ((Boolean) value) ? "TRUE" : "FALSE";
+ return ((Boolean) _value) ? "TRUE" : "FALSE";
}
- if (value instanceof String)
+ if (_value instanceof String)
{
- return encodeString((String) value);
+ return encodeString((String) _value);
}
- return value.toString();
+ return _value.toString();
}
/**
@@ -186,7 +186,7 @@ public class ConstantExpression implements Expression
*/
public static String encodeString(String s)
{
- StringBuffer b = new StringBuffer();
+ StringBuilder b = new StringBuilder();
b.append('\'');
for (int i = 0; i < s.length(); i++)
{
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java
index 01af41c2c9..bee89fcc66 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/PlainSaslClient.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.management.common.sasl;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -28,8 +31,6 @@ import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
public class PlainSaslClient implements SaslClient
{
@@ -170,9 +171,10 @@ public class PlainSaslClient implements SaslClient
clearPassword();
}
- protected void finalize()
+ protected void finalize() throws Throwable
{
clearPassword();
+ super.finalize();
}
private Object[] getUserInfo() throws SaslException
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java
index caee5d481a..3778e39f62 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UserPasswordCallbackHandler.java
@@ -19,12 +19,13 @@
*/
package org.apache.qpid.management.common.sasl;
+import java.io.IOException;
+
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import java.io.IOException;
public class UserPasswordCallbackHandler implements CallbackHandler
{
@@ -70,8 +71,9 @@ public class UserPasswordCallbackHandler implements CallbackHandler
}
}
- protected void finalize()
+ protected void finalize() throws Throwable
{
clearPassword();
+ super.finalize();
}
}
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java
index 314ef70144..5f12fa9dfc 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/sasl/UsernameHashedPasswordCallbackHandler.java
@@ -20,15 +20,16 @@
*/
package org.apache.qpid.management.common.sasl;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
public class UsernameHashedPasswordCallbackHandler implements CallbackHandler
@@ -76,9 +77,10 @@ public class UsernameHashedPasswordCallbackHandler implements CallbackHandler
}
}
- protected void finalize()
+ protected void finalize() throws Throwable
{
clearPassword();
+ super.finalize();
}
public static char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java
index e0ae137c35..1cb938e915 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java
@@ -102,27 +102,27 @@ public abstract class NumericGeneratedPropertySupport extends GeneratedPropertyS
Number result = null;
if (targetType == double.class)
{
- result = new Double(value);
+ result = value;
}
else if (targetType == float.class)
{
- result = new Float(value);
+ result = (float) value;
}
else if (targetType == int.class)
{
- result = new Integer((int) value);
+ result = (int) value;
}
else if (targetType == long.class)
{
- result = new Long((long) value);
+ result = (long) value;
}
else if (targetType == short.class)
{
- result = new Short((short) value);
+ result = (short) value;
}
else if (targetType == byte.class)
{
- result = new Byte((byte) value);
+ result = (byte) value;
}
else
{
diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml
index 1be2ce0f95..b30472ce88 100644
--- a/qpid/java/pom.xml
+++ b/qpid/java/pom.xml
@@ -54,7 +54,6 @@
<qpid.home.qbtc.output>${qpid.home}${file.separator}target${file.separator}qbtc-output</qpid.home.qbtc.output> <!-- override for broker tests -->
<qpid.work>${project.build.directory}${file.separator}QPID_WORK</qpid.work>
- <argLine />
<profile>java-mms.0-10</profile>
<profile.broker.language>java</profile.broker.language>
<profile.broker.type>internal</profile.broker.type>
diff --git a/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 5a3c0182fd..845e7e58c3 100644
--- a/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -90,16 +90,18 @@ public class QpidTestCase extends TestCase
if (file.exists())
{
_logger.info("Using exclude file: " + uri);
- try
+ try(FileReader fileReader = new FileReader(file))
{
- BufferedReader in = new BufferedReader(new FileReader(file));
- String excludedTest = in.readLine();
- do
+ try(BufferedReader in = new BufferedReader(fileReader))
{
- exclusionList.add(excludedTest);
- excludedTest = in.readLine();
+ String excludedTest = in.readLine();
+ do
+ {
+ exclusionList.add(excludedTest);
+ excludedTest = in.readLine();
+ }
+ while (excludedTest != null);
}
- while (excludedTest != null);
}
catch (IOException e)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
index fce47f9986..70e1b27fba 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.MalformedURLException;
@@ -114,7 +115,10 @@ public class TestBrokerConfiguration
try
{
URL url = new URL(initialStoreLocation);
- reader = new InputStreamReader(url.openStream());
+ try(InputStream urlStream = url.openStream())
+ {
+ reader = new InputStreamReader(urlStream);
+ }
}
catch (MalformedURLException e)
{
@@ -122,6 +126,8 @@ public class TestBrokerConfiguration
}
Collection<ConfiguredObjectRecord> records = converter.readFromJson(org.apache.qpid.server.model.Broker.class, parentObject, reader);
+ reader.close();
+
_store = new AbstractMemoryStore(Broker.class){};
ConfiguredObjectRecord[] initialRecords = records.toArray(new ConfiguredObjectRecord[records.size()]);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
deleted file mode 100644
index 941c1d9499..0000000000
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-
-import org.apache.log4j.Logger;
-
-public class AveragedRun implements Callable<RunStats>
-{
- private static final Logger _logger = Logger.getLogger(AveragedRun.class);
-
- private final RunStats stats = new RunStats();
- private final TimedRun test;
- private final int iterations;
-
- public AveragedRun(TimedRun test, int iterations)
- {
- this.test = test;
- this.iterations = iterations;
- }
-
- public RunStats call() throws Exception
- {
- for (int i = 0; i < iterations; i++)
- {
- stats.record(test.call());
- }
- return stats;
- }
-
- public void run() throws Exception
- {
- _logger.info(test + ": " + call());
- }
-
- public String toString()
- {
- return test.toString();
- }
-
- static void run(Collection<AveragedRun> tests) throws Exception
- {
- for(AveragedRun test : tests)
- {
- test.run();
- }
- }
-}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java
deleted file mode 100644
index 3a9354d822..0000000000
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.utils;
-
-import org.apache.log4j.Logger;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
- * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
- * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
- *
- * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
- * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
- * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
- * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
- * conversation (the conversation methods can be called many times in parallel):
- *
- * <p/><pre>
- * class Initiator
- * {
- * ConversationHelper conversation = new ConversationHelper(connection, null,
- * java.util.concurrent.LinkedBlockingQueue.class);
- *
- * initiateConversation()
- * {
- * try {
- * // Exchange greetings.
- * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
- * Message greeting = conversation.receive();
- *
- * // Exchange goodbyes.
- * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- * Message goodbye = conversation.receive();
- * } finally {
- * conversation.end();
- * }
- * }
- * }
- *
- * class Responder
- * {
- * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
- * java.util.concurrent.LinkedBlockingQueue.class);
- *
- * respondToConversation()
- * {
- * try {
- * // Exchange greetings.
- * Message greeting = conversation.receive();
- * conversation.send(conversation.getSession().createTextMessage("Hello."));
- *
- * // Exchange goodbyes.
- * Message goodbye = conversation.receive();
- * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- * } finally {
- * conversation.end();
- * }
- * }
- * }
- * </pre>
- *
- * <p/>Conversation correlation id's are generated on a per thread basis.
- *
- * <p/>The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS
- * sessions are not multi-threaded.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Associate messages to an ongoing conversation using correlation ids.
- * <tr><td> Auto manage sessions for conversations.
- * <tr><td> Store messages not in a conversation in dead letter box.
- * </table>
- */
-public class ConversationFactory
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(ConversationFactory.class);
-
- /** Holds a map from correlation id's to queues. */
- private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
-
- /** Holds the connection over which the conversation is conducted. */
- private Connection connection;
-
- /** Holds the controlSession over which the conversation is conduxted. */
- private Session session;
-
- /** The message consumer for incoming messages. */
- private MessageConsumer consumer;
-
- /** The message producer for outgoing messages. */
- private MessageProducer producer;
-
- /** The well-known or temporary destination to receive replies on. */
- private Destination receiveDestination;
-
- /** Holds the queue implementation class for the reply queue. */
- private Class<? extends BlockingQueue> queueClass;
-
- /** Used to hold any replies that are received outside of the context of a conversation. */
- private BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
-
- /* Used to hold conversation state on a per thread basis. */
- /*
- ThreadLocal<Conversation> threadLocals =
- new ThreadLocal<Conversation>()
- {
- protected Conversation initialValue()
- {
- Conversation settings = new Conversation();
- settings.conversationId = conversationIdGenerator.getAndIncrement();
-
- return settings;
- }
- };
- */
-
- /** Generates new coversation id's as needed. */
- private AtomicLong conversationIdGenerator = new AtomicLong();
-
- /**
- * Creates a conversation helper on the specified connection with the default sending destination, and listening
- * to the specified receiving destination.
- *
- * @param connection The connection to build the conversation helper on.
- * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
- * queue.
- * @param queueClass The queue implementation class.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public ConversationFactory(Connection connection, Destination receiveDestination,
- Class<? extends BlockingQueue> queueClass) throws JMSException
- {
- log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination
- + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called");
-
- this.connection = connection;
- this.queueClass = queueClass;
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Check if a well-known receive destination has been provided, or use a temporary queue if not.
- this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
-
- consumer = session.createConsumer(receiveDestination);
- producer = session.createProducer(null);
-
- consumer.setMessageListener(new Receiver());
- }
-
- /**
- * Creates a new conversation context.
- *
- * @return A new conversation context.
- */
- public Conversation startConversation()
- {
- log.debug("public Conversation startConversation(): called");
-
- Conversation conversation = new Conversation();
- conversation.conversationId = conversationIdGenerator.getAndIncrement();
-
- return conversation;
- }
-
- /**
- * Ensures that the reply queue for a conversation exists.
- *
- * @param conversationId The conversation correlation id.
- */
- private void initQueueForId(long conversationId)
- {
- if (!idsToQueues.containsKey(conversationId))
- {
- idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
- }
- }
-
- /**
- * Clears the dead letter box, returning all messages that were in it.
- *
- * @return All messages in the dead letter box.
- */
- public Collection<Message> emptyDeadLetterBox()
- {
- log.debug("public Collection<Message> emptyDeadLetterBox(): called");
-
- Collection<Message> result = new ArrayList<Message>();
- deadLetterBox.drainTo(result);
-
- return result;
- }
-
- /**
- * Gets the controlSession over which the conversation is conducted.
- *
- * @return The controlSession over which the conversation is conducted.
- */
- public Session getSession()
- {
- // Conversation settings = threadLocals.get();
-
- return session;
- }
-
- /**
- * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
- * destination automatically updated to the last received reply-to destination.
- */
- public class Conversation
- {
- /** Holds the correlation id for the context. */
- private long conversationId;
-
- /**
- * Holds the send destination for the context. This will automatically be updated to the most recently received
- * reply-to destination.
- */
- private Destination sendDestination;
-
- /**
- * Sends a message to the default sending location. The correlation id of the message will be assigned by this
- * method, overriding any previously set value.
- *
- * @param sendDestination The destination to send to. This may be null to use the last received reply-to
- * destination.
- * @param message The message to send.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
- * send destination is specified and there is no most recent reply-to destination available
- * to use.
- */
- public void send(Destination sendDestination, Message message) throws JMSException
- {
- log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = "
- + message.getJMSMessageID() + "): called");
-
- // Conversation settings = threadLocals.get();
- // long conversationId = conversationId;
- message.setJMSCorrelationID(Long.toString(conversationId));
- message.setJMSReplyTo(receiveDestination);
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
-
- // Check if an overriding send to destination has been set or use the last reply-to if not.
- Destination sendTo = null;
-
- if (sendDestination != null)
- {
- sendTo = sendDestination;
- }
- else
- {
- throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
- }
-
- // Send the message.
- synchronized (this)
- {
- producer.send(sendTo, message);
- }
- }
-
- /**
- * Gets the next message in an ongoing conversation. This method may block until such a message is received.
- *
- * @return The next incoming message in the conversation.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
- * did not have its reply-to destination set up.
- */
- public Message receive() throws JMSException
- {
- log.debug("public Message receive(): called");
-
- // Conversation settings = threadLocals.get();
- // long conversationId = settings.conversationId;
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
-
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
-
- try
- {
- Message result = queue.take();
-
- // Keep the reply-to destination to send replies to.
- sendDestination = result.getJMSReplyTo();
-
- return result;
- }
- catch (InterruptedException e)
- {
- return null;
- }
- }
-
- /**
- * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
- * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
- * that timespan will be returned. At least one of the message count or timeout should be set to a value of
- * 1 or greater.
- *
- * @param num The number of messages to receive, or all if this is less than 1.
- * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
- *
- * @return All messages received within the count limit and the timeout.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
- */
- public Collection<Message> receiveAll(int num, long timeout) throws JMSException
- {
- log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout
- + "): called");
-
- // Check that a timeout or message count was set.
- if ((num < 1) && (timeout < 1))
- {
- throw new IllegalArgumentException("At least one of message count (num) or timeout must be set.");
- }
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
-
- // Used to collect the received messages in.
- Collection<Message> result = new ArrayList<Message>();
-
- // Used to indicate when the timeout or message count has expired.
- boolean receiveMore = true;
-
- int messageCount = 0;
-
- // Receive messages until the timeout or message count expires.
- do
- {
- try
- {
- Message next = null;
-
- // Try to receive the message with a timeout if one has been set.
- if (timeout > 0)
- {
- next = queue.poll(timeout, TimeUnit.MILLISECONDS);
-
- // Check if the timeout expired, and stop receiving if so.
- if (next == null)
- {
- receiveMore = false;
- }
- }
- // Receive the message without a timeout.
- else
- {
- next = queue.take();
- }
-
- // Increment the message count if a message was received.
- messageCount += (next != null) ? 1 : 0;
-
- // Check if all the requested messages were received, and stop receiving if so.
- if ((num > 0) && (messageCount >= num))
- {
- receiveMore = false;
- }
-
- // Keep the reply-to destination to send replies to.
- sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination;
-
- if (next != null)
- {
- result.add(next);
- }
- }
- catch (InterruptedException e)
- {
- // Restore the threads interrupted status.
- Thread.currentThread().interrupt();
-
- // Stop receiving but return the messages received so far.
- receiveMore = false;
- }
- }
- while (receiveMore);
-
- return result;
- }
-
- /**
- * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
- * incoming messages using them will go to the dead letter box.
- */
- public void end()
- {
- log.debug("public void end(): called");
-
- // Ensure that the thread local for the current thread is cleaned up.
- // Conversation settings = threadLocals.get();
- // long conversationId = settings.conversationId;
- // threadLocals.remove();
-
- // Ensure that its queue is removed from the queue map.
- BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
-
- // Move any outstanding messages on the threads conversation id into the dead letter box.
- queue.drainTo(deadLetterBox);
- }
- }
-
- /**
- * Implements the message listener for this conversation handler.
- */
- protected class Receiver implements MessageListener
- {
- /**
- * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
- * and placed into queues.
- *
- * @param message The incoming message.
- */
- public void onMessage(Message message)
- {
- log.debug("public void onMessage(Message message = " + message + "): called");
-
- try
- {
- Long conversationId = Long.parseLong(message.getJMSCorrelationID());
-
- // Find the converstaion queue to place the message on. If there is no conversation for the message id,
- // the the dead letter box queue is used.
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
- queue = (queue == null) ? deadLetterBox : queue;
-
- queue.put(message);
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java b/qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java
deleted file mode 100644
index 151d1473ac..0000000000
--- a/qpid/java/systests/src/test/java/org/apache/qpid/util/ClasspathScanner.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-import org.apache.log4j.Logger;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names
- * that match a regular expression.
- *
- * <p/>In order to test whether a class implements an interface or extends a class, the class must be loaded (unless
- * the class files were to be scanned directly). Using this collector can cause problems when it scans the classpath,
- * because loading classes will initialize their statics, which in turn may cause undesired side effects. For this
- * reason, the collector should always be used with a regular expression, through which the class file names are
- * filtered, and only those that pass this filter will be tested. For example, if you define tests in classes that
- * end with the keyword "Test" then use the regular expression "Test$" to match this.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Find all classes matching type and name pattern on the classpath.
- * </table>
- *
- * @todo Add logic to scan jars as well as directories.
- */
-public class ClasspathScanner
-{
- private static final Logger log = Logger.getLogger(ClasspathScanner.class);
-
- /**
- * Scans the classpath and returns all classes that extend a specified class and match a specified name.
- * There is an flag that can be used to indicate that only Java Beans will be matched (that is, only those classes
- * that have a default constructor).
- *
- * @param matchingClass The class or interface to match.
- * @param matchingRegexp The regular expression to match against the class name.
- * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched.
- *
- * @return All the classes that match this collector.
- */
- public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp,
- boolean beanOnly)
- {
- log.debug("public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass = " + matchingClass
- + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
-
- // Build a compiled regular expression from the pattern to match.
- Pattern matchPattern = Pattern.compile(matchingRegexp);
-
- String classPath = System.getProperty("java.class.path");
- Map<String, Class<? extends T>> result = new HashMap<String, Class<? extends T>>();
-
- log.debug("classPath = " + classPath);
-
- // Find matching classes starting from all roots in the classpath.
- for (String path : splitClassPath(classPath))
- {
- gatherFiles(new File(path), "", result, matchPattern, matchingClass);
- }
-
- return result.values();
- }
-
- /**
- * Finds all matching classes rooted at a given location in the file system. If location is a directory it
- * is recursively examined.
- *
- * @param classRoot The root of the current point in the file system being examined.
- * @param classFileName The name of the current file or directory to examine.
- * @param result The accumulated mapping from class names to classes that match the scan.
- *
- * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with
- * iteration.
- */
- private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result,
- Pattern matchPattern, Class<? extends T> matchClass)
- {
- log.debug("private static <T> void gatherFiles(File classRoot = " + classRoot + ", String classFileName = "
- + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
- + ", Class<? extends T> matchClass = " + matchClass + "): called");
-
- File thisRoot = new File(classRoot, classFileName);
-
- // If the current location is a file, check if it is a matching class.
- if (thisRoot.isFile())
- {
- // Check that the file has a matching name.
- if (matchesName(thisRoot.getName(), matchPattern))
- {
- String className = classNameFromFile(thisRoot.getName());
-
- // Check that the class has matching type.
- try
- {
- Class<?> candidateClass = Class.forName(className);
-
- Class matchedClass = matchesClass(candidateClass, matchClass);
-
- if (matchedClass != null)
- {
- result.put(className, matchedClass);
- }
- }
- catch (ClassNotFoundException e)
- {
- // Ignore this. The matching class could not be loaded.
- log.debug("Got ClassNotFoundException, ignoring.", e);
- }
- }
-
- return;
- }
- // Otherwise the current location is a directory, so examine all of its contents.
- else
- {
- String[] contents = thisRoot.list();
-
- if (contents != null)
- {
- for (String content : contents)
- {
- gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass);
- }
- }
- }
- }
-
- /**
- * Checks if the specified class file name corresponds to a class with name matching the specified regular expression.
- *
- * @param classFileName The class file name.
- * @param matchPattern The regular expression pattern to match.
- *
- * @return <tt>true</tt> if the class name matches, <tt>false</tt> otherwise.
- */
- private static boolean matchesName(String classFileName, Pattern matchPattern)
- {
- String className = classNameFromFile(classFileName);
- Matcher matcher = matchPattern.matcher(className);
-
- return matcher.matches();
- }
-
- /**
- * Checks if the specified class to compare extends the base class being scanned for.
- *
- * @param matchingClass The base class to match against.
- * @param toMatch The class to match against the base class.
- *
- * @return The class to check, cast as an instance of the class to match if the class extends the base class, or
- * <tt>null</tt> otherwise.
- */
- private static <T> Class<? extends T> matchesClass(Class<?> matchingClass, Class<? extends T> toMatch)
- {
- try
- {
- return matchingClass.asSubclass(toMatch);
- }
- catch (ClassCastException e)
- {
- return null;
- }
- }
-
- /**
- * Takes a classpath (which is a series of paths) and splits it into its component paths.
- *
- * @param classPath The classpath to split.
- *
- * @return A list of the component paths that make up the class path.
- */
- private static List<String> splitClassPath(String classPath)
- {
- List<String> result = new LinkedList<String>();
- String separator = System.getProperty("path.separator");
- StringTokenizer tokenizer = new StringTokenizer(classPath, separator);
-
- while (tokenizer.hasMoreTokens())
- {
- result.add(tokenizer.nextToken());
- }
-
- return result;
- }
-
- /**
- * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash
- * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending.
- *
- * @param classFileName The filename of the class to translate to a class name.
- *
- * @return The fully qualified class name.
- */
- private static String classNameFromFile(String classFileName)
- {
- log.debug("private static String classNameFromFile(String classFileName = " + classFileName + "): called");
-
- // Remove the .class ending.
- String s = classFileName.substring(0, classFileName.length() - ".class".length());
-
- // Turn / seperators in . seperators.
- String s2 = s.replace(File.separatorChar, '.');
-
- // Knock off any leading . caused by a leading /.
- if (s2.startsWith("."))
- {
- return s2.substring(1);
- }
-
- return s2;
- }
-}