diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-11-07 12:36:36 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-11-07 12:36:36 +0000 |
commit | 793a6b31f7ad6b27f0683eef52ae92e05d7f8fee (patch) | |
tree | 0bbd157b996c54e27bfbaadf6de6f0153909d630 | |
parent | c346e8d12bd8a287695f9a427e22e9c837b866cb (diff) | |
download | qpid-python-793a6b31f7ad6b27f0683eef52ae92e05d7f8fee.tar.gz |
QPID-160 Addition of JMSXUserID to all messages through the java broker.
As this will cause the headers to be re-encoded it can be disabled in the config.xml. Default is enabled as the sample config.xml should have all features enabled so that testing can observe the interactions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@592729 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 161 insertions, 115 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index b5b81bbeb0..2257a612b3 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -8,9 +8,9 @@ - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - + - - http://www.apache.org/licenses/LICENSE-2.0 - - + - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,6 +50,7 @@ <enableDirectBuffers>false</enableDirectBuffers> <framesize>65535</framesize> <compressBufferOnQueue>false</compressBufferOnQueue> + <enableJMSXUserID>true</enableJMSXUserID> </advanced> <security> @@ -84,7 +85,7 @@ <jmx> <access>${conf}/jmxremote.access</access> <principal-database>passwordfile</principal-database> - </jmx> + </jmx> </security> <virtualhosts> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index d3b459c48a..17300d6b50 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,7 +23,9 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; @@ -43,6 +45,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.configuration.Configurator; import java.util.Collection; import java.util.HashMap; @@ -118,9 +121,17 @@ public class AMQChannel private final AMQProtocolSession _session; private boolean _closing; + @Configured(path = "advanced.enableJMSXUserID", + defaultValue = "true") + public boolean ENABLE_JMSXUserID; + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { + //Set values from configuration + Configurator.configure(this); + _session = session; _channelId = channelId; _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); @@ -199,7 +210,7 @@ public class AMQChannel } public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) - throws AMQException + throws AMQException { if (_currentMessage == null) { @@ -212,6 +223,16 @@ public class AMQChannel _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } + if (ENABLE_JMSXUserID) + { + //Set JMSXUserID + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties; + //fixme: fudge for QPID-677 + properties.getHeaders().keySet(); + + properties.setUserId(protocolSession.getAuthorizedID().getName()); + } + _currentMessage.setContentHeaderBody(contentHeaderBody); _currentMessage.setExpiration(); @@ -245,8 +266,8 @@ public class AMQChannel // returns true iff the message was delivered (i.e. if all data was // received if (_currentMessage.addContentBodyFrame(_storeContext, - protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk( - contentBody))) + protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk( + contentBody))) { // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case @@ -303,7 +324,7 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -327,18 +348,19 @@ public class AMQChannel { _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { + { - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug(message); + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug(message); - return true; - } + return true; + } - public void visitComplete() - { } - }); + public void visitComplete() + { + } + }); } AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); @@ -418,7 +440,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag - + ") with a queue(" + queue + ") for " + consumerTag); + + ") with a queue(" + queue + ") for " + consumerTag); } } } @@ -464,7 +486,7 @@ public class AMQChannel // if (_nonTransactedContext == null) { _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -527,7 +549,7 @@ public class AMQChannel // if (_nonTransactedContext == null) { _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -547,7 +569,7 @@ public class AMQChannel else { _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() - + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); // _log.error("Requested requeue of message:" + deliveryTag + // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); // @@ -558,25 +580,26 @@ public class AMQChannel else { _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." - + _unacknowledgedMessageMap.size()); + + _unacknowledgedMessageMap.size()); if (_log.isDebugEnabled()) { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - int count = 0; + { + int count = 0; - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug( + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug( (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]"); - return false; // Continue - } + return false; // Continue + } - public void visitComplete() - { } - }); + public void visitComplete() + { + } + }); } } @@ -603,53 +626,54 @@ public class AMQChannel // Marking messages who still have a consumer for to be resent // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException { - public boolean callback(UnacknowledgedMessage message) throws AMQException + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if (consumerTag != null) { - AMQShortString consumerTag = message.consumerTag; - AMQMessage msg = message.message; - msg.setRedelivered(true); - if (consumerTag != null) + // Consumer exists + if (_consumerTag2QueueMap.containsKey(consumerTag)) { - // Consumer exists - if (_consumerTag2QueueMap.containsKey(consumerTag)) - { - msgToResend.add(message); - } - else // consumer has gone - { - msgToRequeue.add(message); - } + msgToResend.add(message); } - else + else // consumer has gone + { + msgToRequeue.add(message); + } + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null) { - // Message has no consumer tag, so was "delivered" to a GET - // or consumer no longer registered - // cannot resend, so re-queue. - if (message.queue != null) + if (requeue) { - if (requeue) - { - msgToRequeue.add(message); - } - else - { - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); - } + msgToRequeue.add(message); } else { - _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); } } - - // false means continue processing - return false; + else + { + _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + } } - public void visitComplete() - { } - }); + // false means continue processing + return false; + } + + public void visitComplete() + { + } + }); // Process Messages to Resend if (_log.isDebugEnabled()) @@ -704,7 +728,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug("Subscription(" + System.identityHashCode(sub) - + ") closed during resend so requeuing message"); + + ") closed during resend so requeuing message"); } // move this message to requeue msgToRequeue.add(message); @@ -714,7 +738,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" - + System.identityHashCode(sub)); + + System.identityHashCode(sub)); } sub.addToResendQueue(msg); @@ -728,7 +752,7 @@ public class AMQChannel if (_log.isInfoEnabled()) { _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() - + ")to prevent loss"); + + ")to prevent loss"); } // move this message to requeue msgToRequeue.add(message); @@ -752,7 +776,7 @@ public class AMQChannel if (_nonTransactedContext == null) { _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -786,29 +810,30 @@ public class AMQChannel public void queueDeleted(final AMQQueue queue) throws AMQException { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException { - public boolean callback(UnacknowledgedMessage message) throws AMQException + if (message.queue == queue) { - if (message.queue == queue) + try { - try - { - message.discard(_storeContext); - message.queue = null; - } - catch (AMQException e) - { - _log.error( + message.discard(_storeContext); + message.queue = null; + } + catch (AMQException e) + { + _log.error( "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); - } } - - return false; } - public void visitComplete() - { } - }); + return false; + } + + public void visitComplete() + { + } + }); } /** @@ -856,8 +881,8 @@ public class AMQChannel boolean suspend; suspend = - ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) - || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); + ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) + || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); setSuspended(suspend); } @@ -942,7 +967,7 @@ public class AMQChannel { AMQMessage message = bouncedMessage.getAMQMessage(); session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + new AMQShortString(bouncedMessage.getMessage())); message.decrementReference(_storeContext); } @@ -959,7 +984,7 @@ public class AMQChannel else { boolean willSuspend = - ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); + ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); if (!willSuspend) { final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index a5e89ef4fc..07d5d5dd32 100644 --- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -29,9 +29,10 @@ import org.apache.qpid.framing.AMQShortString; public enum CustomJMSXProperty
{
JMS_AMQP_NULL,
- JMS_QPID_DESTTYPE,
+ JMS_QPID_DESTTYPE,
JMSXGroupID,
- JMSXGroupSeq;
+ JMSXGroupSeq,
+ JMSXUserID;
private final AMQShortString _nameAsShortString;
@@ -47,7 +48,7 @@ public enum CustomJMSXProperty }
private static Enumeration _names;
-
+
public static synchronized Enumeration asEnumeration()
{
if(_names == null)
@@ -60,6 +61,6 @@ public enum CustomJMSXProperty }
_names = Collections.enumeration(nameList);
}
- return _names;
+ return _names;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 2dfeb19268..b029770946 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -73,11 +73,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + AMQShortString routingKey, ByteBuffer data) throws AMQException { this(contentHeader, deliveryTag); @@ -201,7 +201,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -391,12 +391,26 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getStringProperty(String propertyName) throws JMSException { - if (_strictAMQP) + + if (propertyName.startsWith("JMSX")) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. + if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) + { + return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); + } + + return null; } + else + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } - return getJmsHeaders().getString(propertyName); + return getJmsHeaders().getString(propertyName); + } } public Object getObjectProperty(String propertyName) throws JMSException diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index dce9667ff2..d2a7ba301b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -59,6 +59,7 @@ public class PropertyValueTest extends TestCase implements MessageListener private final List<String> messages = new ArrayList<String>(); private int _count = 1; public String _connectionString = "vm://:1"; + private static final String USERNAME = "guest"; protected void setUp() throws Exception { @@ -109,7 +110,7 @@ public class PropertyValueTest extends TestCase implements MessageListener _logger.error("Run Number:" + run++); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + init(new AMQConnection(_connectionString, USERNAME, "guest", randomize("Client"), "test")); } catch (Exception e) { @@ -175,7 +176,7 @@ public class PropertyValueTest extends TestCase implements MessageListener _logger.trace("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), - m.getStringProperty("TempQueue")); + m.getStringProperty("TempQueue")); m.setJMSType("Test"); m.setLongProperty("UnsignedInt", (long) 4294967295L); @@ -210,7 +211,7 @@ public class PropertyValueTest extends TestCase implements MessageListener try { ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-scale"), - bd.setScale(Byte.MAX_VALUE + 1)); + bd.setScale(Byte.MAX_VALUE + 1)); fail("UnsupportedOperationException should be thrown as scale can't be correctly transmitted"); } catch (UnsupportedOperationException uoe) @@ -248,47 +249,51 @@ public class PropertyValueTest extends TestCase implements MessageListener Assert.assertEquals("Check Boolean properties are correctly transported", true, m.getBooleanProperty("Bool")); Assert.assertEquals("Check Byte properties are correctly transported", (byte) Byte.MAX_VALUE, - m.getByteProperty("Byte")); + m.getByteProperty("Byte")); Assert.assertEquals("Check Double properties are correctly transported", (double) Double.MAX_VALUE, - m.getDoubleProperty("Double")); + m.getDoubleProperty("Double")); Assert.assertEquals("Check Float properties are correctly transported", (float) Float.MAX_VALUE, - m.getFloatProperty("Float")); + m.getFloatProperty("Float")); Assert.assertEquals("Check Int properties are correctly transported", (int) Integer.MAX_VALUE, - m.getIntProperty("Int")); + m.getIntProperty("Int")); Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation", - m.getJMSCorrelationID()); + m.getJMSCorrelationID()); Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority()); // Queue Assert.assertEquals("Check ReplyTo properties are correctly transported", m.getStringProperty("TempQueue"), - m.getJMSReplyTo().toString()); + m.getJMSReplyTo().toString()); Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType()); Assert.assertEquals("Check Short properties are correctly transported", (short) Short.MAX_VALUE, - m.getShortProperty("Short")); + m.getShortProperty("Short")); Assert.assertEquals("Check UnsignedInt properties are correctly transported", (long) 4294967295L, - m.getLongProperty("UnsignedInt")); + m.getLongProperty("UnsignedInt")); Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE, - m.getLongProperty("Long")); + m.getLongProperty("Long")); Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); // AMQP Tests Specific values Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"), - ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString()); + ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString()); // Decimal BigDecimal bd = new BigDecimal(Integer.MAX_VALUE); Assert.assertEquals("Check decimal properties are correctly transported", bd.setScale(Byte.MAX_VALUE), - ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal"))); + ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal"))); // Void ((AMQMessage) m).setVoidProperty(new AMQShortString("void")); Assert.assertTrue("Check void properties are correctly transported", - ((AMQMessage) m).getPropertyHeaders().containsKey("void")); + ((AMQMessage) m).getPropertyHeaders().containsKey("void")); + + //JMSXUserID + Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME, + m.getStringProperty("JMSXUserID")); } received.clear(); |