summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-01 15:47:08 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-01 15:47:08 +0000
commita9b950ac164bb7e2dd05ae44f99d4b728697ad65 (patch)
treef04a672165555034659f1beab0c140615ed32d67
parent1811ebfb05944ab40e9a4490bc3f797087d98cb3 (diff)
downloadqpid-python-a9b950ac164bb7e2dd05ae44f99d4b728697ad65.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1563433 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/amqp-1-0-client-websocket/pom.xml20
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java63
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java62
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java17
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java42
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java27
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java67
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java88
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java18
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java25
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java42
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java170
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java48
-rw-r--r--java/broker-plugins/management-http/pom.xml20
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java4
-rw-r--r--java/broker-plugins/websocket/pom.xml20
-rw-r--r--java/build.deps18
-rw-r--r--java/ivy.retrieve.xml18
-rw-r--r--java/jca/build.xml2
-rw-r--r--java/jca/example/build-geronimo-properties.xml3
-rw-r--r--java/jca/pom.xml4
-rw-r--r--java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml (renamed from java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml)4
-rw-r--r--java/lib/poms/jetty-continuation-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-continuation-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-http-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-http-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-io-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-io-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-security-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-security-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-server-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-server-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-servlet-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-servlet-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-util-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-util-7.6.10.v20130312.xml)2
-rw-r--r--java/lib/poms/jetty-websocket-8.1.14.v20131031.xml (renamed from java/lib/poms/jetty-websocket-7.6.10.v20130312.xml)2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java24
33 files changed, 401 insertions, 431 deletions
diff --git a/java/amqp-1-0-client-websocket/pom.xml b/java/amqp-1-0-client-websocket/pom.xml
index 205e0d5ab7..3862fb0fc5 100644
--- a/java/amqp-1-0-client-websocket/pom.xml
+++ b/java/amqp-1-0-client-websocket/pom.xml
@@ -44,15 +44,15 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -73,14 +73,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -93,7 +93,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -105,7 +105,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -118,7 +118,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -131,14 +131,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index b00d98637e..6a959df440 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -33,12 +33,14 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -374,9 +376,9 @@ public abstract class AbstractExchange implements Exchange
return getBindings().size();
}
- @Override
- public final List<? extends BaseQueue> route(final ServerMessage message,
- final InstanceProperties instanceProperties)
+
+ final List<? extends BaseQueue> route(final ServerMessage message,
+ final InstanceProperties instanceProperties)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
@@ -416,6 +418,59 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ List<? extends BaseQueue> queues = route(message, instanceProperties);
+
+ if(queues == null || queues.isEmpty())
+ {
+ Exchange altExchange = getAlternateExchange();
+ if(altExchange != null)
+ {
+ return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ else
+ {
+ final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+
+ txn.enqueue(queues,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ for(int i = 0; i < baseQueues.length; i++)
+ {
+ try
+ {
+ baseQueues[i].enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ _reference.release();
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return queues.size();
+ }
+ }
+
protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
final InstanceProperties instanceProperties);
@@ -679,4 +734,6 @@ public abstract class AbstractExchange implements Exchange
public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
}
+
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index e2582019cd..71d0f8b4dd 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -204,22 +207,6 @@ public class DefaultExchange implements Exchange
}
@Override
- public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties)
- {
- AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
- if(q == null)
- {
- List<AMQQueue> noQueues = Collections.emptyList();
- return noQueues;
- }
- else
- {
- return Collections.singletonList(q);
- }
-
- }
-
- @Override
public boolean isBound(AMQQueue queue)
{
return _virtualHost.getQueue(queue.getName()) == queue;
@@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange
{
return _id;
}
+
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ if(q == null)
+ {
+ return 0;
+ }
+ else
+ {
+ txn.enqueue(q,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ q.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+ }
+ }
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 78455c9261..18e912e972 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -94,13 +95,17 @@ public interface Exchange extends ExchangeReferrer
void close() throws AMQException;
/**
- * Returns a list of queues to which to route this message. If there are
- * no queues the empty list must be returned.
- *
- * @return list of queues to which to route the message.
+ * Routes a message
+ * @param message the message to be routed
+ * @param instanceProperties the instance properties
+ * @param txn the transaction to enqueue within
+ * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
+ * @return the number of queues in which the message was enqueued performed
*/
- List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties);
-
+ int send(ServerMessage message,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ BaseQueue.PostEnqueueAction postEnqueueAction);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
new file mode 100644
index 0000000000..afd7ff0269
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+
+public interface MessageInstance
+{
+
+ boolean isAvailable();
+
+ boolean acquire();
+
+ boolean isAcquired();
+
+ void release();
+
+ void delete();
+
+ boolean isDeleted();
+
+ ServerMessage getMessage();
+
+ InstanceProperties getInstanceProperties();
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 80ccbe1649..2aa1d1f473 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -22,11 +22,11 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
-public interface QueueEntry extends Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -177,26 +177,17 @@ public interface QueueEntry extends Comparable<QueueEntry>
AMQQueue getQueue();
- ServerMessage getMessage();
-
long getSize();
boolean getDeliveredToConsumer();
boolean expired() throws AMQException;
- boolean isAvailable();
-
- boolean isAcquired();
-
- boolean acquire();
boolean acquire(Subscription sub);
boolean acquiredBySubscription();
boolean isAcquiredBy(Subscription subscription);
- void release();
-
void setRedelivered();
boolean isRedelivered();
@@ -207,16 +198,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean isRejectedBy(long subscriptionId);
- void delete();
-
- /**
- * Returns true if entry is either DEQUED or DELETED state.
- *
- * @return true if entry is either DEQUED or DELETED state
- */
- boolean isDeleted();
-
- void routeToAlternate();
+ int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
boolean isQueueDeleted();
@@ -241,5 +223,4 @@ public interface QueueEntry extends Comparable<QueueEntry>
Filterable asFilterable();
- InstanceProperties getInstanceProperties();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ed61f1acf6..461d493437 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction;
import java.util.EnumMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
else if(acquire())
{
- routeToAlternate();
+ routeToAlternate(null, null);
}
}
@@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
-
+ boolean autocommit = txn == null;
if (alternateExchange != null)
{
- List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties());
- final ServerMessage message = getMessage();
- if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
+ if(autocommit)
{
- queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties());
+ txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
+ int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
-
- if (queues != null && queues.size() != 0)
+ txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
{
- final List<? extends BaseQueue> rerouteQueues = queues;
- ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
-
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ public void postCommit()
{
- public void postCommit()
- {
- try
- {
- for (BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
-
- }
- });
-
- txn.dequeue(currentQueue, message, new ServerTransaction.Action()
- {
- public void postCommit()
- {
- delete();
- }
+ delete();
+ }
- public void onRollback()
- {
+ public void onRollback()
+ {
- }
- });
+ }
+ });
+ if(autocommit)
+ {
txn.commit();
}
+ return enqueues;
+
+ }
+ else
+ {
+ return 0;
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d63d1946d3..87d11a892e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- if(_alternateExchange != null)
+
+ for(final QueueEntry entry : entries)
{
+ // TODO log requeues with a post enqueue action
+ int requeues = entry.routeToAlternate(null, txn);
- for(final QueueEntry entry : entries)
+ if(requeues == 0)
{
-
- List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
- if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
- {
- queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties());
- }
-
- final ServerMessage message = entry.getMessage();
- if(queues != null && queues.size() != 0)
- {
- final List<? extends BaseQueue> rerouteQueues = queues;
- txn.enqueue(rerouteQueues, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
-
- }
- });
- txn.dequeue(this, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- }
- });
- }
-
+ // TODO log discard
}
-
- _alternateExchange.removeReference(this);
}
- else
- {
- // TODO log discard
-
- for(final QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
- if(message != null)
- {
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- entry.delete();
- }
+ txn.commit();
- public void onRollback()
- {
- }
- });
- }
- }
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
}
- txn.commit();
for (Task task : _deleteTaskList)
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 7bd525c90f..764549626a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -312,10 +312,9 @@ public class TopicExchangeTest extends QpidTestCase
private int routeMessage(String routingKey, long messageNumber) throws AMQException
{
- ServerMessage serverMessage = mock(ServerMessage.class);
- when(serverMessage.getRoutingKey()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY);
ServerMessage message = mock(ServerMessage.class);
+ when(message.getRoutingKey()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 2e3231e208..d3c866f747 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
public class MockQueueEntry implements QueueEntry
{
@@ -62,9 +63,9 @@ public class MockQueueEntry implements QueueEntry
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn)
{
-
+ return 0;
}
public boolean expired() throws AMQException
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index fe82f65115..bae5616042 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -102,6 +104,14 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
+ private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ entry.getQueue().checkCapacity(ServerSession.this);
+ }
+ };
public static interface MessageDispositionChangeListener
{
@@ -182,7 +192,9 @@ public class ServerSession extends Session
return isCommandsFull(id);
}
- public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
+ public int enqueue(final MessageTransferMessage message,
+ final InstanceProperties instanceProperties,
+ final Exchange exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -190,10 +202,10 @@ public class ServerSession extends Session
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
+ int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction);
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction);
incrementOutstandingTxnsIfNecessary();
+ return enqueues;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 973f706e0a..dcca696529 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
};
- List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
- if(queues.isEmpty() && exchange.getAlternateExchange() != null)
- {
- final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(message, instanceProperties);
- if (!queues.isEmpty())
- {
- exchangeInUse = alternateExchange;
- }
- else
- {
- exchangeInUse = exchange;
- }
- }
- else
- {
- exchangeInUse = exchange;
- }
+ int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(!queues.isEmpty())
+ if(enqueues != 0)
{
- serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 17d0e5cb64..357b565365 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -59,7 +59,6 @@ import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
void reject(final QueueEntry entry)
{
entry.setRedelivered();
- entry.routeToAlternate();
+ entry.routeToAlternate(null, null);
if(entry.isAcquiredBy(this))
{
entry.delete();
@@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
protected void sendToDLQOrDiscard(QueueEntry entry)
{
- final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
- if (alternateExchange != null)
+
+ int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
{
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
+ final AMQQueue queue = entry.getQueue();
+ final Exchange alternateExchange = queue.getAlternateExchange();
- if (destinationQueues == null || destinationQueues.isEmpty())
+ if(alternateExchange != null)
{
- entry.delete();
-
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
}
else
{
- entry.routeToAlternate();
-
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
- }
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
}
}
- else
- {
- entry.delete();
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
- }
}
private boolean isMaxDeliveryLimitReached(QueueEntry entry)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b7dc105cb7..c6d4151628 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+
+ private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
+ private final ImmediateAction _immediateAction = new ImmediateAction();
+
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
+ final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate();
+
final InstanceProperties instanceProperties =
new InstanceProperties()
{
@@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
case EXPIRATION:
return amqMessage.getExpiration();
case IMMEDIATE:
- return _currentMessage.getMessagePublishInfo().isImmediate();
+ return immediate;
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
@@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
};
- final List<? extends BaseQueue> destinationQueues =
- _currentMessage.getExchange().route(amqMessage, instanceProperties);
-
- if(destinationQueues == null || destinationQueues.isEmpty())
+ int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ immediate ? _immediateAction : _capacityCheckAction);
+ if(enqueues == 0)
{
handleUnroutableMessage(amqMessage);
}
else
{
- _transaction.enqueue(destinationQueues,
- amqMessage,
- new MessageDeliveryAction(amqMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
handle.flushToStore();
-
}
}
}
@@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(immediate)
{
- action = new ImmediateAction(queue);
+ action = new ImmediateAction();
}
else
{
@@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_reference.release();
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+
+ }
+ private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ {
+
+ public ImmediateAction()
{
- private final BaseQueue _queue;
+ }
- public ImmediateAction(BaseQueue queue)
- {
- _queue = queue;
- }
+ public void onEnqueue(QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
- public void onEnqueue(QueueEntry entry)
+ if (!entry.getDeliveredToConsumer() && entry.acquire())
{
- if (!entry.getDeliveredToConsumer() && entry.acquire())
- {
-
- ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(_queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
{
- @Override
- public void postCommit()
+ try
{
- try
- {
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- super.postCommit();
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
}
+ super.postCommit();
}
- );
- txn.commit();
-
+ }
+ );
+ txn.commit();
- }
}
+ else
+ {
+ queue.checkCapacity(AMQChannel.this);
+ }
+
+ }
+ }
+
+ private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
+ queue.checkCapacity(AMQChannel.this);
}
}
@@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+ final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
_logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
- return;
}
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final AMQQueue queue = rejectedQueueEntry.getQueue();
-
- final Exchange altExchange = queue.getAlternateExchange();
- unackedMap.remove(deliveryTag);
+ int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
- if (altExchange == null)
+ if(requeues == 0)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- rejectedQueueEntry.delete();
- return;
- }
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+ final Exchange altExchange = queue.getAlternateExchange();
- final List<? extends BaseQueue> destinationQueues =
- altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
-
- if (destinationQueues == null || destinationQueues.isEmpty())
- {
- _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
- rejectedQueueEntry.delete();
- return;
- }
-
- rejectedQueueEntry.routeToAlternate();
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 3b981b46b8..3d030890e0 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
{
private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = { ACCEPTED };
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private Exchange _exchange;
private TerminusDurability _durability;
@@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
- if(queues == null || queues.isEmpty())
- {
- Exchange altExchange = _exchange.getAlternateExchange();
- if(altExchange != null)
- {
- queues = altExchange.route(message, instanceProperties);
- }
- }
-
- if(queues != null && !queues.isEmpty())
- {
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
-
- txn.enqueue(queues,message, new ServerTransaction.Action()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit()
- {
- for(int i = 0; i < baseQueues.length; i++)
- {
- try
- {
- baseQueues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- }
- return ACCEPTED;
+ return enqueues == 0 ? REJECTED : ACCEPTED;
}
TerminusDurability getDurability()
diff --git a/java/broker-plugins/management-http/pom.xml b/java/broker-plugins/management-http/pom.xml
index abc754902a..57b2dd863b 100644
--- a/java/broker-plugins/management-http/pom.xml
+++ b/java/broker-plugins/management-http/pom.xml
@@ -50,15 +50,15 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -79,14 +79,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -99,7 +99,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -112,7 +112,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -125,7 +125,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -138,14 +138,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index 039114056f..3375a784ea 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -29,6 +29,7 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import javax.servlet.DispatcherType;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -77,7 +78,6 @@ import org.apache.qpid.server.plugin.PluginFactory;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.DispatcherType;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.SessionManager;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
@@ -396,7 +396,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem
root.addServlet(new ServletHolder(new LogFileServlet()), "/rest/logfile");
final SessionManager sessionManager = root.getSessionHandler().getSessionManager();
- sessionManager.setSessionCookie(JSESSIONID_COOKIE_PREFIX + lastPort);
+ sessionManager.getSessionCookieConfig().setName(JSESSIONID_COOKIE_PREFIX + lastPort);
sessionManager.setMaxInactiveInterval((Integer)getAttribute(TIME_OUT));
return server;
diff --git a/java/broker-plugins/websocket/pom.xml b/java/broker-plugins/websocket/pom.xml
index 2029bd33aa..fb55be05c8 100644
--- a/java/broker-plugins/websocket/pom.xml
+++ b/java/broker-plugins/websocket/pom.xml
@@ -38,15 +38,15 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -67,14 +67,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -87,7 +87,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -100,7 +100,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -113,7 +113,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -126,14 +126,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git a/java/build.deps b/java/build.deps
index 58dea7009e..4dc5b0ca46 100644
--- a/java/build.deps
+++ b/java/build.deps
@@ -35,7 +35,7 @@ geronimo-j2ee=lib/required/geronimo-j2ee-connector_1.5_spec-2.0.0.jar
geronimo-jta=lib/required/geronimo-jta_1.1_spec-1.1.1.jar
geronimo-kernel=lib/required/geronimo-kernel-2.2.1.jar
geronimo-openejb=lib/required/geronimo-ejb_3.0_spec-1.0.1.jar
-geronimo-servlet=lib/required/geronimo-servlet_2.5_spec-1.2.jar
+geronimo-servlet=lib/required/geronimo-servlet_3.0_spec-1.0.jar
junit=lib/required/junit-3.8.1.jar
mockito-all=lib/required/mockito-all-1.9.0.jar
@@ -49,14 +49,14 @@ slf4j-log4j=lib/required/slf4j-log4j12-1.6.4.jar
xalan=lib/required/xalan-2.7.0.jar
-jetty=lib/required/jetty-server-7.6.10.v20130312.jar
-jetty-continuation=lib/required/jetty-continuation-7.6.10.v20130312.jar
-jetty-security=lib/required/jetty-security-7.6.10.v20130312.jar
-jetty-util=lib/required/jetty-util-7.6.10.v20130312.jar
-jetty-io=lib/required/jetty-io-7.6.10.v20130312.jar
-jetty-http=lib/required/jetty-http-7.6.10.v20130312.jar
-jetty-servlet=lib/required/jetty-servlet-7.6.10.v20130312.jar
-jetty-websocket=lib/required/jetty-websocket-7.6.10.v20130312.jar
+jetty=lib/required/jetty-server-8.1.14.v20131031.jar
+jetty-continuation=lib/required/jetty-continuation-8.1.14.v20131031.jar
+jetty-security=lib/required/jetty-security-8.1.14.v20131031.jar
+jetty-util=lib/required/jetty-util-8.1.14.v20131031.jar
+jetty-io=lib/required/jetty-io-8.1.14.v20131031.jar
+jetty-http=lib/required/jetty-http-8.1.14.v20131031.jar
+jetty-servlet=lib/required/jetty-servlet-8.1.14.v20131031.jar
+jetty-websocket=lib/required/jetty-websocket-8.1.14.v20131031.jar
servlet-api=${geronimo-servlet}
dojo-version=1.9.1
diff --git a/java/ivy.retrieve.xml b/java/ivy.retrieve.xml
index 388e2d0dc4..59b3fa70af 100644
--- a/java/ivy.retrieve.xml
+++ b/java/ivy.retrieve.xml
@@ -49,7 +49,7 @@
<dependency org="org.apache.geronimo.specs" name="geronimo-j2ee-connector_1.5_spec" rev="2.0.0" transitive="false"/>
<dependency org="org.apache.geronimo.specs" name="geronimo-jms_1.1_spec" rev="1.0" transitive="false"/>
<dependency org="org.apache.geronimo.specs" name="geronimo-jta_1.1_spec" rev="1.1.1" transitive="false"/>
- <dependency org="org.apache.geronimo.specs" name="geronimo-servlet_2.5_spec" rev="1.2" transitive="false"/>
+ <dependency org="org.apache.geronimo.specs" name="geronimo-servlet_3.0_spec" rev="1.0" transitive="false"/>
<dependency org="com.google.code.gson" name="gson" rev="2.0" transitive="false"/>
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.9.0" transitive="false"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.9.0" transitive="false"/>
@@ -61,14 +61,14 @@
<dependency org="org.mockito" name="mockito-all" rev="1.9.0" transitive="false"/>
<dependency org="org.slf4j" name="slf4j-api" rev="1.6.4" transitive="false"/>
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.4" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-server" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-websocket" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-continuation" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-io" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-http" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-security" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-util" rev="7.6.10.v20130312" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-server" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-websocket" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-continuation" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-io" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-http" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-security" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-util" rev="8.1.14.v20131031" transitive="false"/>
<dependency org="xalan" name="xalan" rev="2.7.0" transitive="false"/>
<dependency org="velocity" name="velocity" rev="1.4" transitive="false"/>
<dependency org="velocity" name="velocity-dep" rev="1.4" transitive="false"/>
diff --git a/java/jca/build.xml b/java/jca/build.xml
index 7137467e4b..83cc781ba9 100644
--- a/java/jca/build.xml
+++ b/java/jca/build.xml
@@ -24,7 +24,7 @@
<property name="module.name" value="jca"/>
<property name="module.genpom" value="true"/>
- <property name="module.genpom.args" value="-Sgeronimo-j2ee-connector_1.5_spec=provided -Sgeronimo-jta_1.1_spec=provided -Sgeronimo-jms_1.1_spec=provided -Sgeronimo-ejb_3.0_spec=provided -Sgeronimo-servlet_2.5_spec=provided -Sgeronimo-kernel=provided"/>
+ <property name="module.genpom.args" value="-Sgeronimo-j2ee-connector_1.5_spec=provided -Sgeronimo-jta_1.1_spec=provided -Sgeronimo-jms_1.1_spec=provided -Sgeronimo-ejb_3.0_spec=provided -Sgeronimo-servlet_3.0_spec=provided -Sgeronimo-kernel=provided"/>
<import file="../module.xml"/>
diff --git a/java/jca/example/build-geronimo-properties.xml b/java/jca/example/build-geronimo-properties.xml
index a20753117f..3c84b7634a 100644
--- a/java/jca/example/build-geronimo-properties.xml
+++ b/java/jca/example/build-geronimo-properties.xml
@@ -87,7 +87,6 @@
<path id="compile.classpath">
<fileset dir="${geronimo.home}/repository/org/apache/geronimo/specs">
<include name="geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar"/>
- <include name="geronimo-servlet_2.5_spec/1.2/geronimo-servlet_2.5_spec-1.2.jar"/>
<include name="geronimo-ejb_3.0_spec/1.0.1/geronimo-ejb_3.0_spec-1.0.1.jar"/>
<include name="geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar"/>
@@ -113,7 +112,7 @@
<fileset dir="${geronimo.home}/repository/org/apache/geronimo/specs">
<include name="geronimo-j2ee-connector_1.5_spec/2.0.0/geronimo-j2ee-connector_1.5_spec-2.0.0.jar"/>
<include name="geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar"/>
- <include name="geronimo-servlet_2.5_spec/1.2/geronimo-servlet_2.5_spec-1.2.jar"/>
+ <include name="geronimo-servlet_3.0_spec/1.0/geronimo-servlet_3.0_spec-1.0.jar"/>
<include name="geronimo-ejb_3.0_spec/1.0.1/geronimo-ejb_3.0_spec-1.0.1.jar"/>
<include name="geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar"/>
</fileset>
diff --git a/java/jca/pom.xml b/java/jca/pom.xml
index 859b8aabac..c7a8de61fe 100644
--- a/java/jca/pom.xml
+++ b/java/jca/pom.xml
@@ -70,8 +70,8 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>provided</scope>
</dependency>
diff --git a/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml b/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml
index 11228afcfa..5e7093bb0a 100644
--- a/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml
+++ b/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml
@@ -17,6 +17,6 @@
-->
<dep>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
</dep>
diff --git a/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml b/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml
index 5beba95d17..10b7a4c499 100644
--- a/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml
@@ -18,5 +18,5 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
</dep>
diff --git a/java/lib/poms/jetty-http-7.6.10.v20130312.xml b/java/lib/poms/jetty-http-8.1.14.v20131031.xml
index 5c840bedd6..929fcbef3a 100644
--- a/java/lib/poms/jetty-http-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-http-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/java/lib/poms/jetty-io-7.6.10.v20130312.xml b/java/lib/poms/jetty-io-8.1.14.v20131031.xml
index 9cec3998ea..42be6ad6ab 100644
--- a/java/lib/poms/jetty-io-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-io-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/java/lib/poms/jetty-security-7.6.10.v20130312.xml b/java/lib/poms/jetty-security-8.1.14.v20131031.xml
index 9501750ba0..8079c78d96 100644
--- a/java/lib/poms/jetty-security-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-security-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/java/lib/poms/jetty-server-7.6.10.v20130312.xml b/java/lib/poms/jetty-server-8.1.14.v20131031.xml
index 587860b50f..5b8160efd4 100644
--- a/java/lib/poms/jetty-server-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-server-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
diff --git a/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml b/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml
index 4c0ff0a41b..5abcf03a18 100644
--- a/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/java/lib/poms/jetty-util-7.6.10.v20130312.xml b/java/lib/poms/jetty-util-8.1.14.v20131031.xml
index f5c990248f..e134444e44 100644
--- a/java/lib/poms/jetty-util-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-util-8.1.14.v20131031.xml
@@ -18,5 +18,5 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
</dep>
diff --git a/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml b/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml
index 4d3ebd1666..1592ca3d56 100644
--- a/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml
+++ b/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 861b225e6f..f92a133919 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -624,32 +624,10 @@ public class MessageStoreTest extends QpidTestCase
storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);
- final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY);
ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
-
- trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : destinationQueues)
- {
- queue.enqueue(currentMessage);
- }
- }
- catch (AMQException e)
- {
- _logger.error("Problem enqueing message", e);
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null);
}