summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-10-20 21:19:00 +0000
committerRobert Greig <rgreig@apache.org>2006-10-20 21:19:00 +0000
commit67e13cee9877551206c3e1418155f5a7fc50d827 (patch)
tree2d98fee6175ae551e8f887c2ab93b1dc2c658bb7
parente4240d67de8cc77c290fb40c3fa773ec5c910f49 (diff)
downloadqpid-python-67e13cee9877551206c3e1418155f5a7fc50d827.tar.gz
Merge from trunk up to revision 466241
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@466266 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/log4j.xml2
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java180
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/TxAck.java129
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java48
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java27
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java81
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessage.java13
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQQueue.java24
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java32
-rw-r--r--java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java116
-rw-r--r--java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java186
-rw-r--r--java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java34
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/AckTest.java37
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java95
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/UnitTests.java2
-rw-r--r--java/client/src/org/apache/qpid/client/AMQHeadersExchange.java6
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java47
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageConsumer.java53
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java17
-rw-r--r--java/client/src/org/apache/qpid/jms/Session.java7
-rw-r--r--java/client/src/org/apache/qpid/jndi/Example.properties21
-rw-r--r--java/client/src/org/apache/qpid/jndi/NameParserImpl.java37
-rw-r--r--java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java289
-rw-r--r--java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java497
-rw-r--r--java/client/test/src/org/apache/qpid/headers/MessageFactory.java13
-rw-r--r--java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java110
-rw-r--r--java/client/test/src/org/apache/qpid/transacted/TransactedTest.java2
-rw-r--r--java/common/bin/qpid-run51
-rw-r--r--java/common/src/org/apache/qpid/framing/ContentBody.java25
29 files changed, 1805 insertions, 376 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml
index 68d9691cc2..ef16c0c370 100644
--- a/java/broker/etc/log4j.xml
+++ b/java/broker/etc/log4j.xml
@@ -19,7 +19,7 @@
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="FileAppender" class="org.apache.log4j.FileAppender">
- <param name="File" value="${QPID_WORK}/log/qpid.log"/>
+ <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
<param name="Append" value="false"/>
<layout class="org.apache.log4j.PatternLayout">
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 78889e69b5..055af16159 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -23,6 +23,10 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
@@ -31,7 +35,6 @@ import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
-import org.apache.qpid.server.util.OrderedMapHelper;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@@ -93,36 +96,17 @@ public class AMQChannel
private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+ private long _lastDeliveryTag;
+
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private final MessageRouter _exchanges;
private final TxnBuffer _txnBuffer;
- private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
-
- public static class UnacknowledgedMessage
- {
- public final AMQMessage message;
- public final String consumerTag;
- public AMQQueue queue;
+ private TxAck ackOp;
- public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag)
- {
- this.queue = queue;
- this.message = message;
- this.consumerTag = consumerTag;
- }
-
- private void discard() throws AMQException
- {
- if (queue != null)
- {
- message.dequeue(queue);
- }
- message.decrementReference();
- }
- }
+ private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
@@ -352,7 +336,8 @@ public class AMQChannel
{
synchronized(_unacknowledgedMessageMapLock)
{
- _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
+ _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ _lastDeliveryTag = deliveryTag;
checkSuspension();
}
}
@@ -444,13 +429,32 @@ public class AMQChannel
{
if (_transactional)
{
- try
+ //check that the tag exists to give early failure
+ if(!multiple || deliveryTag > 0)
{
- _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple)));
+ checkAck(deliveryTag);
}
- catch(NoSuchElementException e)
+ //we use a single txn op for all acks and update this op
+ //as new acks come in. If this is the first ack in the txn
+ //we will need to create and enlist the op.
+ if(ackOp == null)
{
- throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag);
+ ackOp = new TxAck(new AckMap());
+ _txnBuffer.enlist(ackOp);
+ }
+ //update the op to include this ack request
+ if(multiple && deliveryTag == 0)
+ {
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ //if have signalled to ack all, that refers only
+ //to all at this time
+ ackOp.update(_lastDeliveryTag, multiple);
+ }
+ }
+ else
+ {
+ ackOp.update(deliveryTag, multiple);
}
}
else
@@ -459,6 +463,17 @@ public class AMQChannel
}
}
+ private void checkAck(long deliveryTag) throws AMQException
+ {
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+ {
+ throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ }
+ }
+ }
+
private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
{
if (multiple)
@@ -587,6 +602,16 @@ public class AMQChannel
public void commit() throws AMQException
{
+ if(ackOp != null)
+ {
+ ackOp.consolidate();
+ if(ackOp.checkPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ }
+ ackOp = null;//already enlisted, after commit will reset regardless of outcome
+ }
+
_txnBuffer.commit();
//TODO: may need to return 'immediate' messages at this point
}
@@ -636,69 +661,22 @@ public class AMQChannel
_returns.clear();
}
- private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder()
- {
- return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L);
- }
-
-
- private class Ack implements TxnOp
+ //we use this wrapper to ensure we are always using the correct
+ //map instance (its not final unfortunately)
+ private class AckMap implements UnacknowledgedMessageMap
{
- private final Map<Long, UnacknowledgedMessage> _unacked;
-
- Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
{
- _unacked = unacked;
-
- //if any of the messages in unacked are persistent the txn
- //buffer must be marked as persistent:
- for(UnacknowledgedMessage msg : _unacked.values())
- {
- if(msg.message.isPersistent())
- {
- _txnBuffer.containsPersistentChanges();
- break;
- }
- }
+ impl().collect(deliveryTag, multiple, msgs);
}
-
- public void prepare() throws AMQException
+ public void remove(List<UnacknowledgedMessage> msgs)
{
- //make persistent changes, i.e. dequeue and decrementReference
- for(UnacknowledgedMessage msg : _unacked.values())
- {
- msg.discard();
- }
- }
-
- public void undoPrepare()
- {
- //decrementReference is annoyingly untransactional (due to
- //in memory counter) so if we failed in prepare for full
- //txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for(UnacknowledgedMessage msg : _unacked.values())
- {
-
- msg.message.incrementReference();
- }
- }
-
- public void commit()
- {
- //remove the unacked messages from the channels map
- synchronized(_unacknowledgedMessageMapLock)
- {
- for(long tag : _unacked.keySet())
- {
- _unacknowledgedMessageMap.remove(tag);
- }
- }
-
+ impl().remove(msgs);
}
- public void rollback()
+ private UnacknowledgedMessageMap impl()
{
+ return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
}
}
@@ -745,17 +723,6 @@ public class AMQChannel
public void prepare() throws AMQException
{
- //the routers reference can now be released
- _msg.decrementReference();
- try
- {
- _msg.checkDeliveredToConsumer();
- }
- catch(NoConsumersException e)
- {
- //TODO: store this for delivery after the commit-ok
- _returns.add(e.getReturnMessage(_channelId));
- }
}
public void undoPrepare()
@@ -767,6 +734,27 @@ public class AMQChannel
public void commit()
{
+ //The routers reference can now be released. This is done
+ //here to ensure that it happens after the queues that
+ //enqueue it have incremented their counts (which as a
+ //memory only operation is done in the commit phase).
+ try
+ {
+ _msg.decrementReference();
+ }
+ catch(AMQException e)
+ {
+ _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+ }
+ try
+ {
+ _msg.checkDeliveredToConsumer();
+ }
+ catch(NoConsumersException e)
+ {
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e.getReturnMessage(_channelId));
+ }
}
public void rollback()
diff --git a/java/broker/src/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/org/apache/qpid/server/ack/TxAck.java
new file mode 100644
index 0000000000..0d502a8f6e
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/TxAck.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.TxnOp;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A TxnOp implementation for handling accumulated acks
+ */
+public class TxAck implements TxnOp
+{
+ private final UnacknowledgedMessageMap _map;
+ private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>();
+ private final List<Long> _individual = new LinkedList<Long>();
+ private long _deliveryTag;
+ private boolean _multiple;
+
+ public TxAck(UnacknowledgedMessageMap map)
+ {
+ _map = map;
+ }
+
+ public void update(long deliveryTag, boolean multiple)
+ {
+ if(!multiple)
+ {
+ //have acked a single message that is not part of
+ //the previously acked region so record
+ //individually
+ _individual.add(deliveryTag);//_multiple && !multiple
+ }
+ else if(deliveryTag > _deliveryTag)
+ {
+ //have simply moved the last acked message on a
+ //bit
+ _deliveryTag = deliveryTag;
+ _multiple = true;
+ }
+ }
+
+ public void consolidate()
+ {
+ //lookup all the unacked messages that have been acked in this transaction
+ if(_multiple)
+ {
+ //get all the unacked messages for the accumulated
+ //multiple acks
+ _map.collect(_deliveryTag, true, _unacked);
+ }
+ //get any unacked messages for individual acks outside the
+ //range covered by multiple acks
+ for(long tag : _individual)
+ {
+ if(_deliveryTag < tag)
+ {
+ _map.collect(tag, false, _unacked);
+ }
+ }
+ }
+
+ public boolean checkPersistent() throws AMQException
+ {
+ //if any of the messages in unacked are persistent the txn
+ //buffer must be marked as persistent:
+ for(UnacknowledgedMessage msg : _unacked)
+ {
+ if(msg.message.isPersistent())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void prepare() throws AMQException
+ {
+ //make persistent changes, i.e. dequeue and decrementReference
+ for(UnacknowledgedMessage msg : _unacked)
+ {
+ msg.discard();
+ }
+ }
+
+ public void undoPrepare()
+ {
+ //decrementReference is annoyingly untransactional (due to
+ //in memory counter) so if we failed in prepare for full
+ //txn, this op will have to compensate by fixing the count
+ //in memory (persistent changes will be rolled back by store)
+ for(UnacknowledgedMessage msg : _unacked)
+ {
+ msg.message.incrementReference();
+ }
+ }
+
+ public void commit()
+ {
+ //remove the unacked messages from the channels map
+ _map.remove(_unacked);
+ }
+
+ public void rollback()
+ {
+ }
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java
new file mode 100644
index 0000000000..f13ecef7ac
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class UnacknowledgedMessage
+{
+ public final AMQMessage message;
+ public final String consumerTag;
+ public final long deliveryTag;
+ public AMQQueue queue;
+
+ public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag)
+ {
+ this.queue = queue;
+ this.message = message;
+ this.consumerTag = consumerTag;
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void discard() throws AMQException
+ {
+ if (queue != null)
+ {
+ message.dequeue(queue);
+ }
+ message.decrementReference();
+ }
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
new file mode 100644
index 0000000000..ccb55f7e2d
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+
+public interface UnacknowledgedMessageMap
+{
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+ public void remove(List<UnacknowledgedMessage> msgs);
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
new file mode 100644
index 0000000000..117793930f
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+import java.util.Map;
+
+public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+{
+ private final Object _lock;
+ private Map<Long, UnacknowledgedMessage> _map;
+
+ public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+ {
+ _lock = lock;
+ _map = map;
+ }
+
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ {
+ if (multiple)
+ {
+ collect(deliveryTag, msgs);
+ }
+ else
+ {
+ msgs.add(get(deliveryTag));
+ }
+
+ }
+
+ public void remove(List<UnacknowledgedMessage> msgs)
+ {
+ synchronized(_lock)
+ {
+ for(UnacknowledgedMessage msg : msgs)
+ {
+ _map.remove(msg.deliveryTag);
+ }
+ }
+ }
+
+ private UnacknowledgedMessage get(long key)
+ {
+ synchronized(_lock)
+ {
+ return _map.get(key);
+ }
+ }
+
+ private void collect(long key, List<UnacknowledgedMessage> msgs)
+ {
+ synchronized(_lock)
+ {
+ for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ {
+ msgs.add(entry.getValue());
+ if (entry.getKey() == key)
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
index 6573be782b..7666b5b3f8 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
@@ -255,7 +255,7 @@ public class AMQMessage
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*/
- public void decrementReference() throws AMQException
+ public void decrementReference() throws MessageCleanupException
{
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -263,7 +263,16 @@ public class AMQMessage
// not relying on the all the increments having taken place before the delivery manager decrements.
if (_referenceCount.decrementAndGet() == 0)
{
- _store.removeMessage(_messageId);
+ try
+ {
+ _store.removeMessage(_messageId);
+ }
+ catch(AMQException e)
+ {
+ //to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(_messageId, e);
+ }
}
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index 6c795590f6..7fb6ddcb5c 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -707,8 +707,16 @@ public class AMQQueue implements Managable
{
try
{
- msg.decrementReference();
msg.dequeue(this);
+ msg.decrementReference();
+ }
+ catch(MessageCleanupException e)
+ {
+ //Message was dequeued, but could notthen be deleted
+ //though it is no longer referenced. This should be very
+ //rare and can be detected and cleaned up on recovery or
+ //done through some form of manual intervention.
+ _logger.error(e, e);
}
catch(AMQException e)
{
@@ -777,7 +785,8 @@ public class AMQQueue implements Managable
public void prepare() throws AMQException
{
- record(_msg);
+ //do the persistent part of the record()
+ _msg.enqueue(AMQQueue.this);
}
public void undoPrepare()
@@ -786,6 +795,9 @@ public class AMQQueue implements Managable
public void commit()
{
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
try
{
process(_msg);
@@ -799,14 +811,6 @@ public class AMQQueue implements Managable
public void rollback()
{
- try
- {
- _msg.decrementReference();
- }
- catch (AMQException e)
- {
- _logger.error("Error rolling back a queue delivery: " + e, e);
- }
}
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java
new file mode 100644
index 0000000000..6f0fa285b1
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the removal of a message once its refcount reached
+ * zero failed.
+ */
+public class MessageCleanupException extends AMQException
+{
+ public MessageCleanupException(long messageId, AMQException e)
+ {
+ super("Failed to cleanup message with id " + messageId, e);
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java b/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
deleted file mode 100644
index 2c24944317..0000000000
--- a/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import java.util.List;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * Utility class used by AMQChannel to retrieve unacknowledged
- * messages. Made generic to avoid exposing the inner class in
- * AMQChannel. Put in this package to keep ot out the way.
- */
-public class OrderedMapHelper<K, V>
-{
- private final Map<K, V> _map;
- private final Object _lock;
- private final K _wildcard;
-
- public OrderedMapHelper(Map<K, V> map, Object lock, K wildcard)
- {
- _map = map;
- _lock = lock;
- _wildcard = wildcard;
- }
-
- /**
- * Assumes the map passed in is ordered. Returns a copy of the
- * map containing an individual key-value pair or a list of
- * key-values upto and including the one matching the given
- * key. If multiple == true and the key == the wildcard specified
- * on construction, then all the values in the map will be
- * returned.
- */
- public Map<K, V> getValues(K key, boolean multiple) throws NoSuchElementException
- {
- if (multiple)
- {
- if(key == _wildcard)
- {
- synchronized(_lock)
- {
- return new LinkedHashMap<K, V>(_map);
- }
- }
- else
- {
- return getValues(key);
- }
- }
- else
- {
- Map<K, V> values = new LinkedHashMap<K, V>();
- values.put(key, getValue(key));
- return values;
- }
- }
-
- private V getValue(K key) throws NoSuchElementException
- {
- V value;
- synchronized(_lock)
- {
- value = _map.get(key);
- }
-
- if(value == null)
- {
- throw new NoSuchElementException();
- }
- else
- {
- return value;
- }
- }
-
- private Map<K, V> getValues(K key) throws NoSuchElementException
- {
- Map<K, V> values = new LinkedHashMap<K, V>();
- synchronized(_lock)
- {
- if (!_map.containsKey(key))
- {
- throw new NoSuchElementException();
- }
-
- for(Map.Entry<K, V> entry : _map.entrySet())
- {
- values.put(entry.getKey(), entry.getValue());
- if (entry.getKey() == key)
- {
- break;
- }
- }
- }
- return values;
- }
-
-} \ No newline at end of file
diff --git a/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
new file mode 100644
index 0000000000..4bf23ec25a
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class TxAckTest
+{
+ private Scenario individual;
+ private Scenario multiple;
+ private Scenario combined;
+
+ @Before
+ public void setup() throws Exception
+ {
+ //ack only 5th msg
+ individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+ individual.update(5, false);
+
+ //ack all up to and including 5th msg
+ multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+ multiple.update(5, true);
+
+ //leave only 8th and 9th unacked
+ combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+ combined.update(3, false);
+ combined.update(5, true);
+ combined.update(7, true);
+ combined.update(2, true);//should be ignored
+ combined.update(1, false);//should be ignored
+ combined.update(10, false);
+ }
+
+ @Test
+ public void prepare() throws AMQException
+ {
+ individual.prepare();
+ multiple.prepare();
+ combined.prepare();
+ }
+
+ @Test
+ public void undoPrepare() throws AMQException
+ {
+ individual.undoPrepare();
+ multiple.undoPrepare();
+ combined.undoPrepare();
+ }
+
+ @Test
+ public void commit() throws AMQException
+ {
+ individual.commit();
+ multiple.commit();
+ combined.commit();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TxAckTest.class);
+ }
+
+ private class Scenario
+ {
+ private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>();
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages);
+ private final TxAck _op = new TxAck(_map);
+ private final List<Long> _acked;
+ private final List<Long> _unacked;
+
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ {
+ for(int i = 0; i < messageCount; i++)
+ {
+ long deliveryTag = i + 1;
+ _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag));
+ }
+ _acked = acked;
+ _unacked = unacked;
+ }
+
+ void update(long deliverytag, boolean multiple)
+ {
+ _op.update(deliverytag, multiple);
+ }
+
+ private void assertCount(List<Long> tags, int expected)
+ {
+ for(long tag : tags)
+ {
+ UnacknowledgedMessage u = _messages.get(tag);
+ assertTrue("Message not found for tag " + tag, u != null);
+ ((TestMessage) u.message).assertCountEquals(expected);
+ }
+ }
+
+ void prepare() throws AMQException
+ {
+ _op.consolidate();
+ _op.prepare();
+
+ assertCount(_acked, -1);
+ assertCount(_unacked, 0);
+
+ }
+ void undoPrepare()
+ {
+ _op.consolidate();
+ _op.undoPrepare();
+
+ assertCount(_acked, 1);
+ assertCount(_unacked, 0);
+ }
+
+ void commit()
+ {
+ _op.consolidate();
+ _op.commit();
+
+
+ //check acked messages are removed from map
+ HashSet<Long> keys = new HashSet<Long>(_messages.keySet());
+ keys.retainAll(_acked);
+ assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+ //check unacked messages are still in map
+ keys = new HashSet<Long>(_unacked);
+ keys.removeAll(_messages.keySet());
+ assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+ }
+ }
+
+ private class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag)
+ {
+ super(new TestableMemoryMessageStore(), null);
+ _tag = tag;
+ }
+
+ public void incrementReference()
+ {
+ _count++;
+ }
+
+ public void decrementReference()
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java
new file mode 100644
index 0000000000..3d7db01365
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TxAckTest.class
+})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
index 8ce006b3ae..f22ccb8caf 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -93,15 +94,15 @@ public class AckTest
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
for (int i = 1; i <= map.size(); i++)
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
}
assertTrue(_messageStore.getMessageMap().size() == msgCount);
@@ -118,7 +119,7 @@ public class AckTest
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageMap().size() == 0);
}
@@ -135,16 +136,16 @@ public class AckTest
publishMessages(msgCount);
_channel.acknowledgeMessage(5, false);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount - 1);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
int i = 1;
while (i <= map.size())
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
@@ -166,16 +167,16 @@ public class AckTest
publishMessages(msgCount);
_channel.acknowledgeMessage(5, true);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
int i = 1;
while (i <= map.size())
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i + 5);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
++i;
}
@@ -193,16 +194,16 @@ public class AckTest
publishMessages(msgCount);
_channel.acknowledgeMessage(0, true);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
int i = 1;
while (i <= map.size())
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i + 5);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
++i;
}
@@ -221,7 +222,7 @@ public class AckTest
// at this point we should have sent out only 5 messages with a further 5 queued
// up in the channel which should be suspended
assertTrue(_subscription.isSuspended());
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
_channel.acknowledgeMessage(5, true);
assertTrue(!_subscription.isSuspended());
diff --git a/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java b/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
deleted file mode 100644
index 74340e98c0..0000000000
--- a/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import junit.framework.JUnit4TestAdapter;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.Ignore;
-import org.apache.qpid.AMQException;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class OrderedMapHelperTest
-{
- private final Object lock = new Object();
- private final Map<Integer, String> map = new LinkedHashMap<Integer, String>();
- private final OrderedMapHelper<Integer, String> helper = new OrderedMapHelper<Integer, String>(map, lock, 0);
-
- @Before
- public void setup() throws Exception
- {
- map.put(1, "One");
- map.put(2, "Two");
- map.put(5, "Five");
- map.put(8, "Eight");
- map.put(10, "Ten");
- }
-
- @Test
- public void specific()
- {
- Map<Integer, String> slice = helper.getValues(5, false);
- assertEquals(1, slice.size());
- assertTrue(slice.containsKey(5));
- assertTrue(slice.containsValue("Five"));
- assertEquals("Five", slice.get(5));
- }
-
- @Test
- public void multiple()
- {
- Map<Integer, String> slice = helper.getValues(5, true);
- assertEquals(3, slice.size());
-
- assertTrue(slice.containsKey(1));
- assertTrue(slice.containsKey(2));
- assertTrue(slice.containsKey(5));
-
- assertEquals("One", slice.get(1));
- assertEquals("Two", slice.get(2));
- assertEquals("Five", slice.get(5));
- }
-
- @Test
- public void all()
- {
- Map<Integer, String> slice = helper.getValues(0/*the 'wildcard'*/, true);
- assertEquals(5, slice.size());
-
- assertTrue(slice.containsKey(1));
- assertTrue(slice.containsKey(2));
- assertTrue(slice.containsKey(5));
- assertTrue(slice.containsKey(8));
- assertTrue(slice.containsKey(10));
-
- assertEquals("One", slice.get(1));
- assertEquals("Two", slice.get(2));
- assertEquals("Five", slice.get(5));
- assertEquals("Eight", slice.get(8));
- assertEquals("Ten", slice.get(10));
- }
-
- public static junit.framework.Test suite()
- {
- return new JUnit4TestAdapter(OrderedMapHelperTest.class);
- }
-}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
index 5c2242fb0d..d6cc471413 100644
--- a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
+++ b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
@@ -22,7 +22,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
-@Suite.SuiteClasses({LoggingProxyTest.class, OrderedMapHelperTest.class})
+@Suite.SuiteClasses({LoggingProxyTest.class})
public class UnitTests
{
public static junit.framework.Test suite()
diff --git a/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
index 5b8de7a13f..257a9374e8 100644
--- a/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
+++ b/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java
@@ -18,12 +18,18 @@
package org.apache.qpid.client;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.BindingURL;
/**
* A destination backed by a headers exchange
*/
public class AMQHeadersExchange extends AMQDestination
{
+ public AMQHeadersExchange(BindingURL binding)
+ {
+ this(binding.getExchangeName());
+ }
+
public AMQHeadersExchange(String queueName)
{
super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 54ffc979af..8b2c2ec04d 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -35,12 +35,12 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.text.MessageFormat;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -720,18 +720,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- return createConsumer(destination, _defaultPrefetchHighMark, false, false, null);
+ return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector);
+ return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector);
+ return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination,
@@ -740,7 +740,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
- return createConsumer(destination, prefetch, noLocal, exclusive, selector, null);
+ return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ }
+
+
+ public MessageConsumer createConsumer(Destination destination,
+ int prefetchHigh,
+ int prefetchLow,
+ boolean noLocal,
+ boolean exclusive,
+ String selector) throws JMSException
+ {
+ return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -750,12 +761,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
- return createConsumerImpl(destination, prefetch, noLocal, exclusive,
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
+ selector, rawSelector);
+ }
+
+ public MessageConsumer createConsumer(Destination destination,
+ int prefetchHigh,
+ int prefetchLow,
+ boolean noLocal,
+ boolean exclusive,
+ String selector,
+ FieldTable rawSelector) throws JMSException
+ {
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
- final int prefetch,
+ final int prefetchHigh,
+ final int prefetchLow,
final boolean noLocal,
final boolean exclusive,
final String selector,
@@ -780,7 +804,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
- protocolHandler, ft, prefetch, exclusive,
+ protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode);
try
@@ -862,9 +886,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName
* @return the consumer tag generated by the broker
*/
- private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetch,
+ private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
{
+ //fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
@@ -1118,8 +1143,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(),
- consumer.isExclusive(), consumer.getAcknowledgeMode());
+ String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
+ consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
consumer.setConsumerTag(consumerTag);
_consumers.put(consumerTag, consumer);
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
index b46c5f111d..a6f89fd221 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -91,9 +91,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private FieldTable _rawSelectorFieldTable;
/**
- * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover
*/
- private int _prefetch;
+ private int _prefetchHigh;
+
+ /**
+ * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private int _prefetchLow;
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
@@ -118,10 +123,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private long _lastDeliveryTag;
+ /**
+ * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
+ * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ */
+ private boolean _dups_ok_acknowledge_send;
+
BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetch,
- boolean exclusive, int acknowledgeMode)
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
{
_channelId = channelId;
_connection = connection;
@@ -132,7 +143,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session = session;
_protocolHandler = protocolHandler;
_rawSelectorFieldTable = rawSelectorFieldTable;
- _prefetch = prefetch;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
}
@@ -232,7 +244,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public int getPrefetch()
{
- return _prefetch;
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchHigh()
+ {
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchLow()
+ {
+ return _prefetchLow;
}
public boolean isNoLocal()
@@ -309,10 +331,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* We can get back either a Message or an exception from the queue. This method examines the argument and deals
* with it by throwing it (if an exception) or returning it (in any other case).
+ *
* @param o
* @return a message only if o is a Message
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not
- * a JMSException is created with the linked exception set appropriately
+ * a JMSException is created with the linked exception set appropriately
*/
private AbstractJMSMessage returnMessageOrThrow(Object o)
throws JMSException
@@ -335,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
@@ -370,8 +393,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
* of a message listener or a synchronous receive() caller.
+ *
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
+ * @param channelId channel on which this message was sent
*/
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
{
@@ -435,7 +459,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetch)
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
{
_session.acknowledgeMessage(msg.getDeliveryTag(), true);
}
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b181490fdd..acb60f63ae 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -47,6 +47,9 @@ import java.util.concurrent.ConcurrentMap;
*/
public class AMQProtocolSession implements ProtocolVersionList
{
+
+ private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
@@ -59,6 +62,8 @@ public class AMQProtocolSession implements ProtocolVersionList
private final IoSession _minaProtocolSession;
+ private WriteFuture _lastWriteFuture;
+
/**
* The handler from which this session was created and which is used to handle protocol events.
* We send failover events to the handler.
@@ -255,7 +260,7 @@ public class AMQProtocolSession implements ProtocolVersionList
*/
public void writeFrame(AMQDataBlock frame)
{
- _minaProtocolSession.write(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
@@ -265,6 +270,10 @@ public class AMQProtocolSession implements ProtocolVersionList
{
f.join();
}
+ else
+ {
+ _lastWriteFuture = f;
+ }
}
public void addSessionByChannel(int channelId, AMQSession session)
@@ -342,6 +351,12 @@ public class AMQProtocolSession implements ProtocolVersionList
public void closeProtocolSession()
{
+ _logger.debug("Waiting for last write to join.");
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ }
+
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
future.join();
diff --git a/java/client/src/org/apache/qpid/jms/Session.java b/java/client/src/org/apache/qpid/jms/Session.java
index d369c08aa1..1440ace2b6 100644
--- a/java/client/src/org/apache/qpid/jms/Session.java
+++ b/java/client/src/org/apache/qpid/jms/Session.java
@@ -42,6 +42,13 @@ public interface Session extends javax.jms.Session
boolean exclusive,
String selector) throws JMSException;
+ MessageConsumer createConsumer(Destination destination,
+ int prefetchHigh,
+ int prefetchLow,
+ boolean noLocal,
+ boolean exclusive,
+ String selector) throws JMSException;
+
/**
* @return the prefetch value used by default for consumers created on this session.
*/
diff --git a/java/client/src/org/apache/qpid/jndi/Example.properties b/java/client/src/org/apache/qpid/jndi/Example.properties
new file mode 100644
index 0000000000..ae2859f8e1
--- /dev/null
+++ b/java/client/src/org/apache/qpid/jndi/Example.properties
@@ -0,0 +1,21 @@
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialConextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'
+
+# register some queues in JNDI using the form
+# queue.[jndiName] = [physicalName]
+queue.MyQueue = example.MyQueue
+
+# register some topics in JNDI using the form
+# topic.[jndiName] = [physicalName]
+topic.ibmStocks = stocks.nyse.ibm
+
+# Register an AMQP destination in JNDI
+# NOTE: Qpid currently only supports direct,topics and headers
+# destination.[jniName] = [BindingURL]
+destination.direct = direct://amq.direct//directQueue
diff --git a/java/client/src/org/apache/qpid/jndi/NameParserImpl.java b/java/client/src/org/apache/qpid/jndi/NameParserImpl.java
new file mode 100644
index 0000000000..a3174aec7a
--- /dev/null
+++ b/java/client/src/org/apache/qpid/jndi/NameParserImpl.java
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.jndi;
+
+import javax.naming.CompositeName;
+import javax.naming.Name;
+import javax.naming.NameParser;
+import javax.naming.NamingException;
+
+/**
+ * A default implementation of {@link NameParser}
+ * <p/>
+ * Based on class from ActiveMQ.
+ */
+public class NameParserImpl implements NameParser
+{
+ public Name parse(String name) throws NamingException
+ {
+ return new CompositeName(name);
+ }
+}
diff --git a/java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
new file mode 100644
index 0000000000..7691a607ba
--- /dev/null
+++ b/java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -0,0 +1,289 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PropertiesFileInitialContextFactory implements InitialContextFactory
+{
+ protected final Logger _logger = Logger.getLogger(getClass());
+
+ private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
+ private String DESTINATION_PREFIX = "destination.";
+ private String QUEUE_PREFIX = "queue.";
+ private String TOPIC_PREFIX = "topic.";
+
+ public Context getInitialContext(Hashtable environment) throws NamingException
+ {
+ Map data = new ConcurrentHashMap();
+
+ createConnectionFactories(data, environment);
+
+ createDestinations(data, environment);
+
+ createQueues(data, environment);
+
+ createTopics(data, environment);
+
+ return createContext(data, environment);
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected ReadOnlyContext createContext(Map data, Hashtable environment)
+ {
+ return new ReadOnlyContext(environment, data);
+ }
+
+ protected void createConnectionFactories(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(CONNECTION_FACTORY_PREFIX))
+ {
+ String jndiName = key.substring(CONNECTION_FACTORY_PREFIX.length());
+ ConnectionFactory cf = createFactory(entry.getValue().toString());
+ if (cf != null)
+ {
+ data.put(jndiName, cf);
+ }
+ }
+ }
+ }
+
+ protected void createDestinations(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(DESTINATION_PREFIX))
+ {
+ String jndiName = key.substring(DESTINATION_PREFIX.length());
+ Destination dest = createDestination(entry.getValue().toString());
+ if (dest != null)
+ {
+ data.put(jndiName, dest);
+ }
+ }
+ }
+ }
+
+ protected void createQueues(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(QUEUE_PREFIX))
+ {
+ String jndiName = key.substring(QUEUE_PREFIX.length());
+ Queue q = createQueue(entry.getValue().toString());
+ if (q != null)
+ {
+ data.put(jndiName, q);
+ }
+ }
+ }
+ }
+
+ protected void createTopics(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(TOPIC_PREFIX))
+ {
+ String jndiName = key.substring(TOPIC_PREFIX.length());
+ Topic t = createTopic(entry.getValue().toString());
+ if (t != null)
+ {
+ data.put(jndiName, t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Factory method to create new Connection Factory instances
+ */
+ protected ConnectionFactory createFactory(String url)
+ {
+ try
+ {
+ return new AMQConnectionFactory(url);
+ }
+ catch (URLSyntaxException urlse)
+ {
+ _logger.warn("Unable to createFactories:" + urlse);
+ }
+ return null;
+ }
+
+ /**
+ * Factory method to create new Destination instances from an AMQP BindingURL
+ */
+ protected Destination createDestination(String bindingURL)
+ {
+ AMQBindingURL binding;
+ try
+ {
+ binding = new AMQBindingURL(bindingURL);
+ }
+ catch (URLSyntaxException urlse)
+ {
+ _logger.warn("Unable to destination:" + urlse);
+ return null;
+ }
+
+ if (binding.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ return createTopic(binding);
+ }
+ else if (binding.getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ return createQueue(binding);
+ }
+ else if (binding.getExchangeClass().equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
+ {
+ return createHeaderExchange(binding);
+ }
+
+ _logger.warn("Binding: '" + binding + "' not supported");
+ return null;
+ }
+
+ /**
+ * Factory method to create new Queue instances
+ */
+ protected Queue createQueue(Object value)
+ {
+ if (value instanceof String)
+
+ {
+ return new AMQQueue((String) value);
+ }
+ else if (value instanceof BindingURL)
+
+ {
+ return new AMQQueue((BindingURL) value);
+ }
+
+ return null;
+ }
+
+ /**
+ * Factory method to create new Topic instances
+ */
+ protected Topic createTopic(Object value)
+ {
+ if (value instanceof String)
+ {
+ return new AMQTopic((String) value);
+ }
+ else if (value instanceof BindingURL)
+
+ {
+ return new AMQTopic((BindingURL) value);
+ }
+
+ return null;
+ }
+
+ /**
+ * Factory method to create new HeaderExcahnge instances
+ */
+ protected Destination createHeaderExchange(Object value)
+ {
+ if (value instanceof String)
+ {
+ return new AMQHeadersExchange((String) value);
+ }
+ else if (value instanceof BindingURL)
+ {
+ return new AMQHeadersExchange((BindingURL) value);
+ }
+
+ return null;
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public String getConnectionPrefix()
+ {
+ return CONNECTION_FACTORY_PREFIX;
+ }
+
+ public void setConnectionPrefix(String connectionPrefix)
+ {
+ this.CONNECTION_FACTORY_PREFIX = connectionPrefix;
+ }
+
+ public String getDestinationPrefix()
+ {
+ return DESTINATION_PREFIX;
+ }
+
+ public void setDestinationPrefix(String destinationPrefix)
+ {
+ this.DESTINATION_PREFIX = destinationPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return QUEUE_PREFIX;
+ }
+
+ public void setQueuePrefix(String queuePrefix)
+ {
+ this.QUEUE_PREFIX = queuePrefix;
+ }
+
+ public String getTopicPrefix()
+ {
+ return TOPIC_PREFIX;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ this.TOPIC_PREFIX = topicPrefix;
+ }
+}
diff --git a/java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java b/java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java
new file mode 100644
index 0000000000..b721aefc87
--- /dev/null
+++ b/java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java
@@ -0,0 +1,497 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.jndi;
+
+import javax.naming.*;
+import javax.naming.spi.NamingManager;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Based on class from ActiveMQ.
+ * A read-only Context
+ * <p/>
+ * This version assumes it and all its subcontext are read-only and any attempt
+ * to modify (e.g. through bind) will result in an OperationNotSupportedException.
+ * Each Context in the tree builds a cache of the entries in all sub-contexts
+ * to optimise the performance of lookup.
+ * </p>
+ * <p>This implementation is intended to optimise the performance of lookup(String)
+ * to about the level of a HashMap get. It has been observed that the scheme
+ * resolution phase performed by the JVM takes considerably longer, so for
+ * optimum performance lookups should be coded like:</p>
+ * <code>
+ * Context componentContext = (Context)new InitialContext().lookup("java:comp");
+ * String envEntry = (String) componentContext.lookup("env/myEntry");
+ * String envEntry2 = (String) componentContext.lookup("env/myEntry2");
+ * </code>
+ *
+ * @version $Revision: 1.2 $ $Date: 2005/08/27 03:52:39 $
+ */
+public class ReadOnlyContext implements Context, Serializable
+{
+ private static final long serialVersionUID = -5754338187296859149L;
+ protected static final NameParser nameParser = new NameParserImpl();
+
+ protected final Hashtable environment; // environment for this context
+ protected final Map bindings; // bindings at my level
+ protected final Map treeBindings; // all bindings under me
+
+ private boolean frozen = false;
+ private String nameInNamespace = "";
+ public static final String SEPARATOR = "/";
+
+ public ReadOnlyContext()
+ {
+ environment = new Hashtable();
+ bindings = new HashMap();
+ treeBindings = new HashMap();
+ }
+
+ public ReadOnlyContext(Hashtable env)
+ {
+ if (env == null)
+ {
+ this.environment = new Hashtable();
+ }
+ else
+ {
+ this.environment = new Hashtable(env);
+ }
+ this.bindings = Collections.EMPTY_MAP;
+ this.treeBindings = Collections.EMPTY_MAP;
+ }
+
+ public ReadOnlyContext(Hashtable environment, Map bindings)
+ {
+ if (environment == null)
+ {
+ this.environment = new Hashtable();
+ }
+ else
+ {
+ this.environment = new Hashtable(environment);
+ }
+ this.bindings = bindings;
+ treeBindings = new HashMap();
+ frozen = true;
+ }
+
+ public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace)
+ {
+ this(environment, bindings);
+ this.nameInNamespace = nameInNamespace;
+ }
+
+ protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env)
+ {
+ this.bindings = clone.bindings;
+ this.treeBindings = clone.treeBindings;
+ this.environment = new Hashtable(env);
+ }
+
+ protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace)
+ {
+ this(clone, env);
+ this.nameInNamespace = nameInNamespace;
+ }
+
+ public void freeze()
+ {
+ frozen = true;
+ }
+
+ boolean isFrozen()
+ {
+ return frozen;
+ }
+
+ /**
+ * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses.
+ * It binds every possible lookup into a map in each context. To do this, each context
+ * strips off one name segment and if necessary creates a new context for it. Then it asks that context
+ * to bind the remaining name. It returns a map containing all the bindings from the next context, plus
+ * the context it just created (if it in fact created it). (the names are suitably extended by the segment
+ * originally lopped off).
+ *
+ * @param name
+ * @param value
+ * @return
+ * @throws javax.naming.NamingException
+ */
+ protected Map internalBind(String name, Object value) throws NamingException
+ {
+ assert name != null && name.length() > 0;
+ assert!frozen;
+
+ Map newBindings = new HashMap();
+ int pos = name.indexOf('/');
+ if (pos == -1)
+ {
+ if (treeBindings.put(name, value) != null)
+ {
+ throw new NamingException("Something already bound at " + name);
+ }
+ bindings.put(name, value);
+ newBindings.put(name, value);
+ }
+ else
+ {
+ String segment = name.substring(0, pos);
+ assert segment != null;
+ assert!segment.equals("");
+ Object o = treeBindings.get(segment);
+ if (o == null)
+ {
+ o = newContext();
+ treeBindings.put(segment, o);
+ bindings.put(segment, o);
+ newBindings.put(segment, o);
+ }
+ else if (!(o instanceof ReadOnlyContext))
+ {
+ throw new NamingException("Something already bound where a subcontext should go");
+ }
+ ReadOnlyContext readOnlyContext = (ReadOnlyContext) o;
+ String remainder = name.substring(pos + 1);
+ Map subBindings = readOnlyContext.internalBind(remainder, value);
+ for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String subName = segment + "/" + (String) entry.getKey();
+ Object bound = entry.getValue();
+ treeBindings.put(subName, bound);
+ newBindings.put(subName, bound);
+ }
+ }
+ return newBindings;
+ }
+
+ protected ReadOnlyContext newContext()
+ {
+ return new ReadOnlyContext();
+ }
+
+ public Object addToEnvironment(String propName, Object propVal) throws NamingException
+ {
+ return environment.put(propName, propVal);
+ }
+
+ public Hashtable getEnvironment() throws NamingException
+ {
+ return (Hashtable) environment.clone();
+ }
+
+ public Object removeFromEnvironment(String propName) throws NamingException
+ {
+ return environment.remove(propName);
+ }
+
+ public Object lookup(String name) throws NamingException
+ {
+ if (name.length() == 0)
+ {
+ return this;
+ }
+ Object result = treeBindings.get(name);
+ if (result == null)
+ {
+ result = bindings.get(name);
+ }
+ if (result == null)
+ {
+ int pos = name.indexOf(':');
+ if (pos > 0)
+ {
+ String scheme = name.substring(0, pos);
+ Context ctx = NamingManager.getURLContext(scheme, environment);
+ if (ctx == null)
+ {
+ throw new NamingException("scheme " + scheme + " not recognized");
+ }
+ return ctx.lookup(name);
+ }
+ else
+ {
+ // Split out the first name of the path
+ // and look for it in the bindings map.
+ CompositeName path = new CompositeName(name);
+
+ if (path.size() == 0)
+ {
+ return this;
+ }
+ else
+ {
+ String first = path.get(0);
+ Object obj = bindings.get(first);
+ if (obj == null)
+ {
+ throw new NameNotFoundException(name);
+ }
+ else if (obj instanceof Context && path.size() > 1)
+ {
+ Context subContext = (Context) obj;
+ obj = subContext.lookup(path.getSuffix(1));
+ }
+ return obj;
+ }
+ }
+ }
+ if (result instanceof LinkRef)
+ {
+ LinkRef ref = (LinkRef) result;
+ result = lookup(ref.getLinkName());
+ }
+ if (result instanceof Reference)
+ {
+ try
+ {
+ result = NamingManager.getObjectInstance(result, null, null, this.environment);
+ }
+ catch (NamingException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw(NamingException) new NamingException("could not look up : " + name).initCause(e);
+ }
+ }
+ if (result instanceof ReadOnlyContext)
+ {
+ String prefix = getNameInNamespace();
+ if (prefix.length() > 0)
+ {
+ prefix = prefix + SEPARATOR;
+ }
+ result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name);
+ }
+ return result;
+ }
+
+ public Object lookup(Name name) throws NamingException
+ {
+ return lookup(name.toString());
+ }
+
+ public Object lookupLink(String name) throws NamingException
+ {
+ return lookup(name);
+ }
+
+ public Name composeName(Name name, Name prefix) throws NamingException
+ {
+ Name result = (Name) prefix.clone();
+ result.addAll(name);
+ return result;
+ }
+
+ public String composeName(String name, String prefix) throws NamingException
+ {
+ CompositeName result = new CompositeName(prefix);
+ result.addAll(new CompositeName(name));
+ return result.toString();
+ }
+
+ public NamingEnumeration list(String name) throws NamingException
+ {
+ Object o = lookup(name);
+ if (o == this)
+ {
+ return new ListEnumeration();
+ }
+ else if (o instanceof Context)
+ {
+ return ((Context) o).list("");
+ }
+ else
+ {
+ throw new NotContextException();
+ }
+ }
+
+ public NamingEnumeration listBindings(String name) throws NamingException
+ {
+ Object o = lookup(name);
+ if (o == this)
+ {
+ return new ListBindingEnumeration();
+ }
+ else if (o instanceof Context)
+ {
+ return ((Context) o).listBindings("");
+ }
+ else
+ {
+ throw new NotContextException();
+ }
+ }
+
+ public Object lookupLink(Name name) throws NamingException
+ {
+ return lookupLink(name.toString());
+ }
+
+ public NamingEnumeration list(Name name) throws NamingException
+ {
+ return list(name.toString());
+ }
+
+ public NamingEnumeration listBindings(Name name) throws NamingException
+ {
+ return listBindings(name.toString());
+ }
+
+ public void bind(Name name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void bind(String name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void close() throws NamingException
+ {
+ // ignore
+ }
+
+ public Context createSubcontext(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public Context createSubcontext(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void destroySubcontext(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void destroySubcontext(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public String getNameInNamespace() throws NamingException
+ {
+ return nameInNamespace;
+ }
+
+ public NameParser getNameParser(Name name) throws NamingException
+ {
+ return nameParser;
+ }
+
+ public NameParser getNameParser(String name) throws NamingException
+ {
+ return nameParser;
+ }
+
+ public void rebind(Name name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rebind(String name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rename(Name oldName, Name newName) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rename(String oldName, String newName) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void unbind(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void unbind(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ private abstract class LocalNamingEnumeration implements NamingEnumeration
+ {
+ private Iterator i = bindings.entrySet().iterator();
+
+ public boolean hasMore() throws NamingException
+ {
+ return i.hasNext();
+ }
+
+ public boolean hasMoreElements()
+ {
+ return i.hasNext();
+ }
+
+ protected Map.Entry getNext()
+ {
+ return (Map.Entry) i.next();
+ }
+
+ public void close() throws NamingException
+ {
+ }
+ }
+
+ private class ListEnumeration extends LocalNamingEnumeration
+ {
+ public Object next() throws NamingException
+ {
+ return nextElement();
+ }
+
+ public Object nextElement()
+ {
+ Map.Entry entry = getNext();
+ return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName());
+ }
+ }
+
+ private class ListBindingEnumeration extends LocalNamingEnumeration
+ {
+ public Object next() throws NamingException
+ {
+ return nextElement();
+ }
+
+ public Object nextElement()
+ {
+ Map.Entry entry = getNext();
+ return new Binding((String) entry.getKey(), entry.getValue());
+ }
+ }
+}
diff --git a/java/client/test/src/org/apache/qpid/headers/MessageFactory.java b/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
index fcbfe8abf2..fc976766e4 100644
--- a/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
+++ b/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
@@ -17,10 +17,15 @@
*/
package org.apache.qpid.headers;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.FieldTable;
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
/**
*/
@@ -46,7 +51,7 @@ class MessageFactory
}
_session = session;
_payload = new byte[payloadSize];
- for(int i = 0; i < _payload.length; i++)
+ for (int i = 0; i < _payload.length; i++)
{
_payload[i] = (byte) DATA[i % DATA.length];
}
@@ -156,7 +161,7 @@ class MessageFactory
private static Message setHeaders(Message m, String[] headers) throws JMSException
{
- for(int i = 0; i < headers.length; i++)
+ for (int i = 0; i < headers.length; i++)
{
// the value in GRM is 5 bytes
m.setStringProperty(headers[i], "value");
diff --git a/java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java b/java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
new file mode 100644
index 0000000000..27a92d334a
--- /dev/null
+++ b/java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.test.unit.jndi;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Properties;
+
+public class PropertiesFileInitialContextFactoryTest
+{
+ InitialContextFactory contextFactory;
+ Properties _properties;
+
+ @Before
+ public void setupProperties()
+ {
+ _properties = new Properties();
+ _properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ _properties.put("connectionfactory.local", "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'");
+ _properties.put("queue.MyQueue", "example.MyQueue");
+ _properties.put("topic.ibmStocks", "stocks.nyse.ibm");
+ _properties.put("destination.direct", "direct://amq.direct//directQueue");
+ }
+
+ @Test
+ public void test()
+ {
+ Context ctx = null;
+ try
+ {
+ ctx = new InitialContext(_properties);
+ }
+ catch (NamingException ne)
+ {
+ Assert.fail("Error loading context:" + ne);
+ }
+
+ try
+ {
+ AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
+ Assert.assertEquals("amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'", cf.getConnectionURL().toString());
+ }
+ catch (NamingException ne)
+ {
+ Assert.fail("Unable to create Connection Factory:" + ne);
+ }
+
+ try
+ {
+ AMQQueue queue = (AMQQueue) ctx.lookup("MyQueue");
+ Assert.assertEquals("example.MyQueue", queue.getRoutingKey());
+ }
+ catch (NamingException ne)
+ {
+ Assert.fail("Unable to create queue:" + ne);
+ }
+
+ try
+ {
+ AMQTopic topic = (AMQTopic) ctx.lookup("ibmStocks");
+ Assert.assertEquals("stocks.nyse.ibm", topic.getTopicName());
+ }
+ catch (Exception ne)
+ {
+ Assert.fail("Unable to create topic:" + ne);
+ }
+
+ try
+ {
+ AMQQueue direct = (AMQQueue) ctx.lookup("direct");
+ Assert.assertEquals("directQueue", direct.getRoutingKey());
+ }
+ catch (NamingException ne)
+ {
+ Assert.fail("Unable to create direct destination:" + ne);
+ }
+
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(PropertiesFileInitialContextFactoryTest.class);
+ }
+}
diff --git a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
index 86744d162f..200c89bebe 100644
--- a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
+++ b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
@@ -120,6 +120,7 @@ public class TransactedTest
expect("Z", testConsumer2.receive(1000));
assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
@Test
@@ -140,6 +141,7 @@ public class TransactedTest
expect("B", consumer.receive(1000));
expect("C", consumer.receive(1000));
+ assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));
}
diff --git a/java/common/bin/qpid-run b/java/common/bin/qpid-run
index a45b5a9692..7db32acebb 100644
--- a/java/common/bin/qpid-run
+++ b/java/common/bin/qpid-run
@@ -15,6 +15,12 @@
# limitations under the License.
#
+# Test if we're running on cygwin.
+cygwin=false
+if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then
+ cygwin=true
+fi
+
die() {
if [[ $1 = -usage ]]; then
shift
@@ -41,6 +47,42 @@ if [ -z "$QPID_WORK" ]; then
QPID_WORK=$HOME
fi
+if $cygwin; then
+ QPID_HOME=$(cygpath -w $QPID_HOME)
+ QPID_WORK=$(cygpath -w $QPID_WORK)
+fi
+
+#Set the default system properties that we'll use now that they have
+#all been initialised
+SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
+
+#If logprefix or logsuffix set to use PID make that happen
+#Otherwise just pass the value through for these props
+#Using X character to avoid probs with empty strings
+if [ -n "$QPID_LOG_PREFIX" ]; then
+ if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then
+ echo Using pid in qpid log name prefix
+ LOG_PREFIX=" -Dlogprefix=$$"
+ else
+ echo Using qpid logprefix property
+ LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
+ fi
+ SYSTEM_PROPS+=$LOG_PREFIX
+fi
+
+if [ -n "$QPID_LOG_SUFFIX" ]; then
+ if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then
+ echo Using pid in qpid log name suffix
+ LOG_SUFFIX=" -Dlogsuffix=$$"
+ else
+ echo Using qpig logsuffix property
+ LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
+ fi
+ SYSTEM_PROPS+=$LOG_SUFFIX
+fi
+
+echo System Properties set to $SYSTEM_PROPS
+
program=$(basename $0)
sourced=${BASH_SOURCE[0]}
if [[ -z ${sourced:-''} ]]; then
@@ -58,11 +100,6 @@ usage() {
sed "s/^--$//"
}
-cygwin=false
-if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then
- cygwin=true
-fi
-
export EXTERNAL_CLASSPATH=$CLASSPATH
unset CLASSPATH
@@ -175,12 +212,10 @@ for arg in "${RUN_ARGS[@]}"; do
done
if $cygwin; then
- QPID_HOME=$(cygpath -w $QPID_HOME)
CLASSPATH=$(cygpath -w -p $CLASSPATH)
- QPID_WORK=$(cygpath -w $QPID_WORK)
JAVA=$(cygpath -u $JAVA)
fi
-COMMAND=($JAVA $JAVA_VM $JAVA_MEM -Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
+COMMAND=($JAVA $JAVA_VM $JAVA_MEM $SYSTEM_PROPS $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
DISPATCH
diff --git a/java/common/src/org/apache/qpid/framing/ContentBody.java b/java/common/src/org/apache/qpid/framing/ContentBody.java
index a345d1d225..d7b668534c 100644
--- a/java/common/src/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/org/apache/qpid/framing/ContentBody.java
@@ -32,7 +32,7 @@ public class ContentBody extends AMQBody
public int getSize()
{
- return (payload == null?0:payload.limit());
+ return (payload == null ? 0 : payload.limit());
}
public void writePayload(ByteBuffer buffer)
@@ -49,8 +49,27 @@ public class ContentBody extends AMQBody
if (size > 0)
{
payload = buffer.slice();
- payload.limit((int)size);
- buffer.skip((int)size);
+ payload.limit((int) size);
+ buffer.skip((int) size);
+ }
+
+ }
+
+ public void reduceBufferToFit()
+ {
+ if (payload != null && (payload.remaining() < payload.capacity() / 2))
+ {
+ int size = payload.limit();
+ ByteBuffer newPayload = ByteBuffer.allocate(size);
+
+ newPayload.put(payload);
+ newPayload.position(0);
+ newPayload.limit(size);
+
+ //reduce reference count on payload
+ payload.release();
+
+ payload = newPayload;
}
}