summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java76
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Protocol.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java89
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java56
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPortImpl.java82
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java5
15 files changed, 334 insertions, 61 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index f8585344b0..b7be1bfd9b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -50,7 +50,7 @@ public interface ConsumerTarget
AMQSessionModel getSessionModel();
- void send(MessageInstance entry, boolean batch);
+ long send(MessageInstance entry, boolean batch);
void flushBatched();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index f944821c6f..b191db8523 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -1431,15 +1431,22 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
static String interpolate(ConfiguredObject<?> object, String value)
{
- Map<String,String> inheritedContext = new HashMap<String, String>();
- generateInheritedContext(object.getModel(), object, inheritedContext);
- return Strings.expand(value, false,
- JSON_SUBSTITUTION_RESOLVER,
- getOwnAttributeResolver(object),
- new Strings.MapResolver(inheritedContext),
- Strings.JAVA_SYS_PROPS_RESOLVER,
- Strings.ENV_VARS_RESOLVER,
- object.getModel().getTypeRegistry().getDefaultContextResolver());
+ if(object == null)
+ {
+ return value;
+ }
+ else
+ {
+ Map<String, String> inheritedContext = new HashMap<String, String>();
+ generateInheritedContext(object.getModel(), object, inheritedContext);
+ return Strings.expand(value, false,
+ JSON_SUBSTITUTION_RESOLVER,
+ getOwnAttributeResolver(object),
+ new Strings.MapResolver(inheritedContext),
+ Strings.JAVA_SYS_PROPS_RESOLVER,
+ Strings.ENV_VARS_RESOLVER,
+ object.getModel().getTypeRegistry().getDefaultContextResolver());
+ }
}
private static OwnAttributeResolver getOwnAttributeResolver(final ConfiguredObject<?> object)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 982ebb01c6..1a9390f210 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -128,6 +128,18 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedAttribute( defaultValue = "false")
boolean getStatisticsReportingResetEnabled();
+ String BROKER_MESSAGE_COMPRESSION_ENABLED = "broker.messageCompressionEnabled";
+ @ManagedContextDefault(name = BROKER_MESSAGE_COMPRESSION_ENABLED)
+ boolean DEFAULT_MESSAGE_COMPRESSION_ENABLED = true;
+
+ @ManagedAttribute( defaultValue = "${"+ BROKER_MESSAGE_COMPRESSION_ENABLED +"}")
+ boolean isMessageCompressionEnabled();
+
+ String MESSAGE_COMPRESSION_THRESHOLD_SIZE = "connection.messageCompressionThresholdSize";
+ @ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE)
+ int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+
+
@DerivedAttribute( persist = true )
String getModelVersion();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
index 236e7fdccc..4ef1d315dd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
@@ -20,11 +20,23 @@
*/
package org.apache.qpid.server.model;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.log4j.Logger;
public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttribute<C,T>
{
+ private static final Logger LOGGER = Logger.getLogger(ConfiguredAutomatedAttribute.class);
+
private final ManagedAttribute _annotation;
+ private final Method _validValuesMethod;
ConfiguredAutomatedAttribute(final Class<C> clazz,
final Method getter,
@@ -32,6 +44,53 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend
{
super(clazz, getter);
_annotation = annotation;
+ Method validValuesMethod = null;
+
+ if(_annotation.validValues().length == 1)
+ {
+ String validValue = _annotation.validValues()[0];
+
+ validValuesMethod = getValidValuesMethod(validValue, clazz);
+ }
+ _validValuesMethod = validValuesMethod;
+ }
+
+ private Method getValidValuesMethod(final String validValue, final Class<C> clazz)
+ {
+ if(validValue.matches("([\\w][\\w\\d_]+\\.)+[\\w][\\w\\d_\\$]*#[\\w\\d_]+\\s*\\(\\s*\\)"))
+ {
+ String function = validValue;
+ try
+ {
+ String className = function.split("#")[0].trim();
+ String methodName = function.split("#")[1].split("\\(")[0].trim();
+ Class<?> validValueCalculatingClass = Class.forName(className);
+ Method method = validValueCalculatingClass.getMethod(methodName);
+ if (Modifier.isStatic(method.getModifiers()) && Modifier.isPublic(method.getModifiers()))
+ {
+ if (Collection.class.isAssignableFrom(method.getReturnType()))
+ {
+ if (method.getGenericReturnType() instanceof ParameterizedType)
+ {
+ Type parameterizedType =
+ ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0];
+ if (parameterizedType == String.class)
+ {
+ return method;
+ }
+ }
+ }
+ }
+
+ }
+ catch (ClassNotFoundException | NoSuchMethodException e)
+ {
+ LOGGER.warn("The validValues of the " + getName() + " attribute in class " + clazz.getSimpleName()
+ + " has value '" + validValue + "' which looks like it should be a method,"
+ + " but no such method could be used.", e );
+ }
+ }
+ return null;
}
public boolean isAutomated()
@@ -69,4 +128,21 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend
return _annotation.description();
}
+ public Collection<String> validValues()
+ {
+ if(_validValuesMethod != null)
+ {
+ try
+ {
+ return (Collection<String>) _validValuesMethod.invoke(null);
+ }
+ catch (InvocationTargetException | IllegalAccessException e)
+ {
+ LOGGER.warn("Could not execute the validValues generation method " + _validValuesMethod.getName(), e);
+ return Collections.emptySet();
+ }
+ }
+ return Arrays.asList(_annotation.validValues());
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 7a965c19d7..5b3965904e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -43,6 +43,7 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
String TRANSPORT = "transport";
String PORT = "port";
+
@DerivedAttribute
String getClientId();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
index 2a72c5c5ac..d8b36f487c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
@@ -34,4 +34,5 @@ public @interface ManagedAttribute
boolean persist() default true;
String defaultValue() default "";
String description() default "";
+ String[] validValues() default {};
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Protocol.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Protocol.java
index 22ea1d9706..a943c0f54c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Protocol.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Protocol.java
@@ -87,6 +87,6 @@ public enum Protocol
public static enum ProtocolType
{
- AMQP, HTTP, JMX, RMI;
+ AMQP, HTTP, JMX, RMI
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 67c713e9d9..af46bae1c4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -92,6 +92,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private int _statisticsReportingPeriod;
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
+ @ManagedAttributeField
+ private boolean _messageCompressionEnabled;
private State _state = State.UNINITIALIZED;
@@ -360,6 +362,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@Override
+ public boolean isMessageCompressionEnabled()
+ {
+ return _messageCompressionEnabled;
+ }
+
+ @Override
public String getModelVersion()
{
return BrokerModel.MODEL_VERSION;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index 0d3ec6d3bb..fa599b4d5f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.server.model.port;
+import java.util.Set;
+
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@ManagedObject( category = false, type = "AMQP")
@@ -57,5 +61,13 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
@ManagedAttribute( mandatory = true )
AuthenticationProvider getAuthenticationProvider();
+
+ @ManagedAttribute( defaultValue = "TCP",
+ validValues = {"org.apache.qpid.server.model.port.AmqpPortImpl#getAllAvailableTransportCombinations()"})
+ Set<Transport> getTransports();
+
+ @ManagedAttribute( validValues = {"org.apache.qpid.server.model.port.AmqpPortImpl#getAllAvailableProtocolCombinations()"} )
+ Set<Protocol> getProtocols();
+
VirtualHostImpl getVirtualHost(String name);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index e6e2d7bbb8..6f6d04c335 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -19,9 +19,12 @@
*/
package org.apache.qpid.server.model.port;
+import java.io.IOException;
+import java.io.StringWriter;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
@@ -32,6 +35,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import org.codehaus.jackson.map.ObjectMapper;
+
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.messages.BrokerMessages;
@@ -43,6 +48,7 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.TransportProviderFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
@@ -275,4 +281,87 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp
return null;
}
+ public static Set<Protocol> getInstalledProtocols()
+ {
+ Set<Protocol> protocols = new HashSet<>();
+ for(ProtocolEngineCreator installedEngine : (new QpidServiceLoader<ProtocolEngineCreator>()).instancesOf(ProtocolEngineCreator.class))
+ {
+ protocols.add(installedEngine.getVersion());
+ }
+ return protocols;
+ }
+
+ @SuppressWarnings("unused")
+ public static Collection<String> getAllAvailableProtocolCombinations()
+ {
+ Set<Protocol> protocols = getInstalledProtocols();
+
+ Set<Set<String>> last = new HashSet<>();
+ for(Protocol protocol : protocols)
+ {
+ last.add(Collections.singleton(protocol.name()));
+ }
+
+ Set<Set<String>> protocolCombinations = new HashSet<>(last);
+ for(int i = 1; i < protocols.size(); i++)
+ {
+ Set<Set<String>> current = new HashSet<>();
+ for(Set<String> set : last)
+ {
+ for(Protocol p : protocols)
+ {
+ if(!set.contains(p.name()))
+ {
+ Set<String> potential = new HashSet<>(set);
+ potential.add(p.name());
+ current.add(potential);
+ }
+ }
+ }
+ protocolCombinations.addAll(current);
+ last = current;
+ }
+ Set<String> combinationsAsString = new HashSet<>(protocolCombinations.size());
+ ObjectMapper mapper = new ObjectMapper();
+ for(Set<String> combination : protocolCombinations)
+ {
+ try(StringWriter writer = new StringWriter())
+ {
+ mapper.writeValue(writer, combination);
+ combinationsAsString.add(writer.toString());
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Unexpected IO Exception generating JSON string", e);
+ }
+ }
+ return Collections.unmodifiableSet(combinationsAsString);
+ }
+
+ @SuppressWarnings("unused")
+ public static Collection<String> getAllAvailableTransportCombinations()
+ {
+ Set<Set<Transport>> combinations = new HashSet<>();
+
+ for(TransportProviderFactory providerFactory : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class))
+ {
+ combinations.addAll(providerFactory.getSupportedTransports());
+ }
+
+ Set<String> combinationsAsString = new HashSet<>(combinations.size());
+ ObjectMapper mapper = new ObjectMapper();
+ for(Set<Transport> combination : combinations)
+ {
+ try(StringWriter writer = new StringWriter())
+ {
+ mapper.writeValue(writer, combination);
+ combinationsAsString.add(writer.toString());
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Unexpected IO Exception generating JSON string", e);
+ }
+ }
+ return Collections.unmodifiableSet(combinationsAsString);
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
index b169b07e35..fa2af121ae 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.server.model.port;
+import java.util.Set;
+
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
@ManagedObject( category = false, type = "HTTP")
public interface HttpPort<X extends HttpPort<X>> extends Port<X>
@@ -42,5 +46,13 @@ public interface HttpPort<X extends HttpPort<X>> extends Port<X>
@ManagedAttribute( mandatory = true )
AuthenticationProvider getAuthenticationProvider();
+
+ @ManagedAttribute( defaultValue = "TCP",
+ validValues = {"[ \"TCP\" ]", "[ \"SSL\" ]", "[ \"TCP\", \"SSL\" ]"})
+ Set<Transport> getTransports();
+
+ @ManagedAttribute( validValues = { "[ \"HTTP\"]"} )
+ Set<Protocol> getProtocols();
+
void setPortManager(PortManager manager);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
index 56c77cbb03..48754e92e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.server.model.port;
+import java.util.Set;
+
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
@ManagedObject( category = false, type = "JMX")
public interface JmxPort<X extends JmxPort<X>> extends Port<X>
@@ -42,5 +46,13 @@ public interface JmxPort<X extends JmxPort<X>> extends Port<X>
@ManagedAttribute( mandatory = true )
AuthenticationProvider getAuthenticationProvider();
+
+ @ManagedAttribute( defaultValue = "TCP",
+ validValues = {"[ \"TCP\" ]", "[ \"SSL\" ]"})
+ Set<Transport> getTransports();
+
+ @ManagedAttribute( validValues = { "[ \"JMX_RMI\"]"} )
+ Set<Protocol> getProtocols();
+
void setPortManager(PortManager manager);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java
index ed975d041a..d2420aa343 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java
@@ -20,65 +20,27 @@
*/
package org.apache.qpid.server.model.port;
-import java.util.Collections;
-import java.util.Map;
import java.util.Set;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
@ManagedObject( category = false, type = "RMI")
-public class RmiPort extends AbstractPort<RmiPort>
+public interface RmiPort<X extends RmiPort<X>> extends Port<X>
{
- private PortManager _portManager;
- @ManagedObjectFactoryConstructor
- public RmiPort(final Map<String, Object> attributes,
- final Broker<?> broker)
- {
- super(attributes, broker);
- }
+ @ManagedAttribute( validValues = { "[ \"RMI\"]"} )
+ Set<Protocol> getProtocols();
- @Override
- public void onValidate()
- {
- super.onValidate();
+ @ManagedAttribute( defaultValue = "TCP",
+ validValues = {"[ \"TCP\" ]"})
+ Set<Transport> getTransports();
- validateOnlyOneInstance();
- if (getTransports().contains(Transport.SSL))
- {
- throw new IllegalConfigurationException("Can't create RMI registry port which requires SSL");
- }
+ public void setPortManager(PortManager manager);
- }
- @Override
- protected Set<Protocol> getDefaultProtocols()
- {
- return Collections.singleton(Protocol.RMI);
- }
-
- public void setPortManager(PortManager manager)
- {
- _portManager = manager;
- }
-
- @Override
- protected State onActivate()
- {
- if(_portManager != null && _portManager.isActivationAllowed(this))
- {
- return super.onActivate();
- }
- else
- {
- return State.QUIESCED;
- }
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPortImpl.java
new file mode 100644
index 0000000000..e236b7cb91
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPortImpl.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.port;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Transport;
+
+public class RmiPortImpl extends AbstractPort<RmiPortImpl> implements RmiPort<RmiPortImpl>
+{
+ private PortManager _portManager;
+
+ @ManagedObjectFactoryConstructor
+ public RmiPortImpl(final Map<String, Object> attributes,
+ final Broker<?> broker)
+ {
+ super(attributes, broker);
+ }
+
+ @Override
+ public void onValidate()
+ {
+ super.onValidate();
+
+ validateOnlyOneInstance();
+
+ if (getTransports().contains(Transport.SSL))
+ {
+ throw new IllegalConfigurationException("Can't create RMI registry port which requires SSL");
+ }
+
+ }
+
+ @Override
+ protected Set<Protocol> getDefaultProtocols()
+ {
+ return Collections.singleton(Protocol.RMI);
+ }
+
+ public void setPortManager(PortManager manager)
+ {
+ _portManager = manager;
+ }
+
+ @Override
+ protected State onActivate()
+ {
+ if(_portManager != null && _portManager.isActivationAllowed(this))
+ {
+ return super.onActivate();
+ }
+ else
+ {
+ return State.QUIESCED;
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index d80aa92007..4044c938db 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -471,9 +471,8 @@ class QueueConsumerImpl
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
- ServerMessage message = entry.getMessage();
- _deliveredBytes.addAndGet(message.getSize());
- _target.send(entry, batch);
+ long size = _target.send(entry, batch);
+ _deliveredBytes.addAndGet(size);
}
@Override