diff options
15 files changed, 210 insertions, 78 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index d8485ef0f2..117231b36e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -286,12 +286,14 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber + * @param noLocal * @return the consumer tag. This is returned to the subscriber and used in * subsequent unsubscribe requests * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException + public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, + FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -302,7 +304,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters); + queue.registerProtocolSession(session, _channelId, tag, acks, filters,noLocal); _consumerTag2QueueMap.put(tag, queue); return tag; } @@ -499,7 +501,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag + - " and multiple " + multiple); + " and multiple " + multiple); } if (multiple) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index bf282020ee..1e57c714ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -77,7 +77,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } try { - String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments); + String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, + body.arguments, body.noLocal); if (!body.nowait) { session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag)); @@ -90,8 +91,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { _log.info("Closing connection due to invalid selector"); session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(), - ise.getMessage(), BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + ise.getMessage(), BasicConsumeBody.CLASS_ID, + BasicConsumeBody.METHOD_ID)); } catch (ConsumerTagNotUniqueException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 00ae547683..79b2e11bca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -78,12 +78,19 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< AuthenticationResult authResult = authMgr.authenticate(ss, body.response); + //save clientProperties + if (protocolSession.getClientProperties() == null) + { + protocolSession.setClientProperties(body.clientProperties); + } + switch (authResult.status) { case ERROR: throw new AMQException("Authentication failed"); case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); + stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(), HeartbeatConfig.getInstance().getDelay()); @@ -122,7 +129,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< static int getConfiguredFrameSize() { final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); - final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); + final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); _logger.info("Framesize set to " + framesize); return framesize; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e623d23a79..6ba78ba722 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -26,17 +26,19 @@ import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; + import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -89,10 +91,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private boolean _closed; // maximum number of channels this session should have private long _maxNoOfChannels = 1000; - + /* AMQP Version for this session */ private byte _major; private byte _minor; + private FieldTable _clientProperties; public ManagedObject getManagedObject() { @@ -128,7 +131,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return new AMQProtocolSessionMBean(this); } - catch(JMException ex) + catch (JMException ex) { _logger.error("AMQProtocolSession MBean creation has failed ", ex); throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); @@ -153,18 +156,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { ProtocolInitiation pi = (ProtocolInitiation) message; // this ensures the codec never checks for a PI message again - ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false); - try { + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + try + { pi.checkVersion(this); // Fails if not correct // This sets the protocol version (and hence framing classes) for this session. _major = pi.protocolMajor; _minor = pi.protocolMinor; String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; - AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null, + AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null, mechanisms.getBytes(), locales.getBytes()); _minaProtocolSession.write(response); - } catch (AMQException e) { + } + catch (AMQException e) + { _logger.error("Received incorrect protocol initiation", e); /* Find last protocol version in protocol version list. Make sure last protocol version listed in the build file (build-module.xml) is the latest version which will be used @@ -211,7 +217,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _logger.debug("Method frame received: " + frame); } final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, - (AMQMethodBody)frame.bodyFrame); + (AMQMethodBody) frame.bodyFrame); try { boolean wasAnyoneInterested = false; @@ -266,7 +272,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame); + getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -275,7 +281,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame); + getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); } /** @@ -355,6 +361,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * Close a specific channel. This will remove any resources used by the channel, including: * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> * </ul> + * * @param channelId id of the channel to close * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid @@ -381,6 +388,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * In our current implementation this is used by the clustering code. + * * @param channelId */ public void removeChannel(int channelId) @@ -390,11 +398,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Initialise heartbeats on the session. + * * @param delay delay in seconds (not ms) */ public void initHeartbeats(int delay) { - if(delay > 0) + if (delay > 0) { _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay)); @@ -404,6 +413,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Closes all channels that were opened by this protocol session. This frees up all resources * used by the channel. + * * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() throws AMQException @@ -421,7 +431,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, */ public void closeSession() throws AMQException { - if(!_closed) + if (!_closed) { _closed = true; closeAllChannels(); @@ -463,11 +473,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // information is used by SASL primary. if (address instanceof InetSocketAddress) { - return ((InetSocketAddress)address).getHostName(); + return ((InetSocketAddress) address).getHostName(); } else if (address instanceof VmPipeAddress) { - return "vmpipe:" + ((VmPipeAddress)address).getPort(); + return "vmpipe:" + ((VmPipeAddress) address).getPort(); } else { @@ -484,22 +494,32 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _saslServer = saslServer; } - + + public FieldTable getClientProperties() + { + return _clientProperties; + } + + public void setClientProperties(FieldTable clientProperties) + { + _clientProperties = clientProperties; + } + /** * Convenience methods for managing AMQP version. * NOTE: Both major and minor will be set to 0 prior to protocol initiation. */ - + public byte getAmqpMajor() { return _major; } - + public byte getAmqpMinor() { return _minor; } - + public boolean amqpVersionEquals(byte major, byte minor) { return _major == major && _minor == minor; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index acaf6b0d9b..03d0c50dac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; @@ -122,4 +123,9 @@ public interface AMQProtocolSession * @param saslServer */ void setSaslServer(SaslServer saslServer); + + + FieldTable getClientProperties(); + + void setClientProperties(FieldTable clientProperties); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index e64daef690..561b719b2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -96,7 +96,7 @@ public class AMQQueue implements Managable, Comparable * max allowed number of messages on a queue. */ private Integer _maxMessageCount = 10000; - + /** * max queue depth(KB) for the queue */ @@ -362,12 +362,17 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException + { + registerProtocolSession(ps, channel, consumerTag, acks, filters, false); + } + + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); _subscribers.addSubscriber(subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index d8bb6e1948..8bdadcb493 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -281,7 +281,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } // Only give the message to those that want them. - if (sub.hasFilters() && sub.hasInterest(msg)) + if (sub.hasInterest(msg)) { sub.enqueueForPreDelivery(msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index f464384562..2bb77dc649 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -33,12 +33,10 @@ import org.apache.qpid.framing.FieldTable; */ public interface SubscriptionFactory { - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) - throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, + FieldTable filters, boolean noLocal) throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag) - throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 79b0593f69..fc00754cda 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; @@ -56,6 +57,7 @@ public class SubscriptionImpl implements Subscription private Queue<AMQMessage> _messages; + private final boolean _noLocal; /** * True if messages need to be acknowledged @@ -65,21 +67,15 @@ public class SubscriptionImpl implements Subscription public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters); - } - - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException - { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); } } @@ -87,11 +83,11 @@ public class SubscriptionImpl implements Subscription String consumerTag, boolean acks) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null); + this(channelId, protocolSession, consumerTag, acks, null, false); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, FieldTable filters) + String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); @@ -105,6 +101,8 @@ public class SubscriptionImpl implements Subscription this.consumerTag = consumerTag; sessionKey = protocolSession.getKey(); _acks = acks; + _noLocal = noLocal; + _filters = FilterManagerFactory.createManager(filters); if (_filters != null) @@ -218,7 +216,22 @@ public class SubscriptionImpl implements Subscription public boolean hasInterest(AMQMessage msg) { - return _filters.allAllow(msg); + if (_noLocal) + { + return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( + msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))); + } + else + { + if (_filters != null) + { + return _filters.allAllow(msg); + } + else + { + return true; + } + } } public Queue<AMQMessage> getPreDeliveryQueue() @@ -235,8 +248,6 @@ public class SubscriptionImpl implements Subscription } - - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index a4afe18e4d..91e720ea54 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -139,22 +139,15 @@ class SubscriptionSet implements WeightedSubscriptionManager if (!subscription.isSuspended()) { - if (!subscription.hasFilters()) + if (subscription.hasInterest(msg)) { - return subscription; - } - else - { - if (subscription.hasInterest(msg)) + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty()) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (subscription.getPreDeliveryQueue().isEmpty()) - { - return subscription; - } + return subscription; } } } @@ -208,6 +201,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Notification that a queue has been deleted. This is called so that the subscription can inform the * channel, which in turn can update its list of unacknowledged messages. + * * @param queue */ public void queueDeleted(AMQQueue queue) diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 6ab7808110..d9e946c397 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.common.QpidProperties; + import java.util.Enumeration; import javax.jms.ConnectionMetaData; @@ -29,7 +31,6 @@ public class QpidConnectionMetaData implements ConnectionMetaData { - QpidConnectionMetaData(AMQConnection conn) { } @@ -46,7 +47,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData public String getJMSProviderName() throws JMSException { - return "Apache Qpid"; + return "Apache " + QpidProperties.getProductName(); } public String getJMSVersion() throws JMSException @@ -71,8 +72,8 @@ public class QpidConnectionMetaData implements ConnectionMetaData public String getProviderVersion() throws JMSException { - return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ " - + getProtocolVersion() + "] )"; + return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ " + + getProtocolVersion() + "] )"; } private String getProtocolVersion() @@ -89,8 +90,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData public String getClientVersion() { - // TODO - get client build version from properties file or similar - return "<unknown>"; + return QpidProperties.getBuildVerision(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 9333df3fe4..f7b0cb5331 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -22,6 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; @@ -119,10 +121,11 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put("instance", ps.getClientID()); - clientProperties.put("product", "Qpid"); - clientProperties.put("version", "1.0"); - clientProperties.put("platform", getFullSystemInfo()); + + clientProperties.put(ClientProperties.instance.toString(), ps.getClientID()); + clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); + clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision()); + clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism, saslResponse, selectedLocale)); } diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java new file mode 100644 index 0000000000..07371b5182 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.common; + +public enum ClientProperties +{ + instance, + product, + version, + platform +} diff --git a/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java new file mode 100644 index 0000000000..3a96821e93 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.common; + +public class QpidProperties +{ + + static + { + //load values from property file. + } + + public static String getProductName() + { + return "Qpid"; + } + + public static String getReleaseVerision() + { + return "1.0"; + } + + + public static String getBuildVerision() + { + return "1"; + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index bb8fd5bc19..87e5c43932 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; @@ -121,4 +122,13 @@ public class MockProtocolSession implements AMQProtocolSession public void setSaslServer(SaslServer saslServer) { } + + public FieldTable getClientProperties() + { + return null; + } + + public void setClientProperties(FieldTable clientProperties) + { + } } |