diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-01 15:47:08 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-01 15:47:08 +0000 |
commit | a9b950ac164bb7e2dd05ae44f99d4b728697ad65 (patch) | |
tree | f04a672165555034659f1beab0c140615ed32d67 | |
parent | 1811ebfb05944ab40e9a4490bc3f797087d98cb3 (diff) | |
download | qpid-python-a9b950ac164bb7e2dd05ae44f99d4b728697ad65.tar.gz |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1563433 13f79535-47bb-0310-9956-ffa450edef68
33 files changed, 401 insertions, 431 deletions
diff --git a/java/amqp-1-0-client-websocket/pom.xml b/java/amqp-1-0-client-websocket/pom.xml index 205e0d5ab7..3862fb0fc5 100644 --- a/java/amqp-1-0-client-websocket/pom.xml +++ b/java/amqp-1-0-client-websocket/pom.xml @@ -44,15 +44,15 @@ <dependency> <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-servlet_2.5_spec</artifactId> - <version>1.2</version> + <artifactId>geronimo-servlet_3.0_spec</artifactId> + <version>1.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -73,14 +73,14 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-continuation</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-security</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -93,7 +93,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -105,7 +105,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-io</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -118,7 +118,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-servlet</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -131,14 +131,14 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-websocket</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index b00d98637e..6a959df440 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -33,12 +33,14 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -374,9 +376,9 @@ public abstract class AbstractExchange implements Exchange return getBindings().size(); } - @Override - public final List<? extends BaseQueue> route(final ServerMessage message, - final InstanceProperties instanceProperties) + + final List<? extends BaseQueue> route(final ServerMessage message, + final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); @@ -416,6 +418,59 @@ public abstract class AbstractExchange implements Exchange return queues; } + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final BaseQueue.PostEnqueueAction postEnqueueAction) + { + List<? extends BaseQueue> queues = route(message, instanceProperties); + + if(queues == null || queues.isEmpty()) + { + Exchange altExchange = getAlternateExchange(); + if(altExchange != null) + { + return altExchange.send(message, instanceProperties, txn, postEnqueueAction); + } + else + { + return 0; + } + } + else + { + final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); + + txn.enqueue(queues,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + for(int i = 0; i < baseQueues.length; i++) + { + try + { + baseQueues[i].enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + _reference.release(); + } + + public void onRollback() + { + _reference.release(); + } + }); + return queues.size(); + } + } + protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message, final InstanceProperties instanceProperties); @@ -679,4 +734,6 @@ public abstract class AbstractExchange implements Exchange public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; } + + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index e2582019cd..71d0f8b4dd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange @@ -204,22 +207,6 @@ public class DefaultExchange implements Exchange } @Override - public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties) - { - AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); - if(q == null) - { - List<AMQQueue> noQueues = Collections.emptyList(); - return noQueues; - } - else - { - return Collections.singletonList(q); - } - - } - - @Override public boolean isBound(AMQQueue queue) { return _virtualHost.getQueue(queue.getName()) == queue; @@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange { return _id; } + + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final BaseQueue.PostEnqueueAction postEnqueueAction) + { + final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + if(q == null) + { + return 0; + } + else + { + txn.enqueue(q,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + q.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + } + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 78455c9261..18e912e972 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -94,13 +95,17 @@ public interface Exchange extends ExchangeReferrer void close() throws AMQException; /** - * Returns a list of queues to which to route this message. If there are - * no queues the empty list must be returned. - * - * @return list of queues to which to route the message. + * Routes a message + * @param message the message to be routed + * @param instanceProperties the instance properties + * @param txn the transaction to enqueue within + * @param postEnqueueAction action to perform on the result of every enqueue (may be null) + * @return the number of queues in which the message was enqueued performed */ - List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties); - + int send(ServerMessage message, + InstanceProperties instanceProperties, + ServerTransaction txn, + BaseQueue.PostEnqueueAction postEnqueueAction); /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java new file mode 100644 index 0000000000..afd7ff0269 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + + +public interface MessageInstance +{ + + boolean isAvailable(); + + boolean acquire(); + + boolean isAcquired(); + + void release(); + + void delete(); + + boolean isDeleted(); + + ServerMessage getMessage(); + + InstanceProperties getInstanceProperties(); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 80ccbe1649..2aa1d1f473 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -22,11 +22,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.ServerTransaction; -public interface QueueEntry extends Comparable<QueueEntry> +public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> { @@ -177,26 +177,17 @@ public interface QueueEntry extends Comparable<QueueEntry> AMQQueue getQueue(); - ServerMessage getMessage(); - long getSize(); boolean getDeliveredToConsumer(); boolean expired() throws AMQException; - boolean isAvailable(); - - boolean isAcquired(); - - boolean acquire(); boolean acquire(Subscription sub); boolean acquiredBySubscription(); boolean isAcquiredBy(Subscription subscription); - void release(); - void setRedelivered(); boolean isRedelivered(); @@ -207,16 +198,7 @@ public interface QueueEntry extends Comparable<QueueEntry> boolean isRejectedBy(long subscriptionId); - void delete(); - - /** - * Returns true if entry is either DEQUED or DELETED state. - * - * @return true if entry is either DEQUED or DELETED state - */ - boolean isDeleted(); - - void routeToAlternate(); + int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn); boolean isQueueDeleted(); @@ -241,5 +223,4 @@ public interface QueueEntry extends Comparable<QueueEntry> Filterable asFilterable(); - InstanceProperties getInstanceProperties(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ed61f1acf6..461d493437 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction; import java.util.EnumMap; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry } else if(acquire()) { - routeToAlternate(); + routeToAlternate(null, null); } } @@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public void routeToAlternate() + public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); - + boolean autocommit = txn == null; if (alternateExchange != null) { - List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties()); - final ServerMessage message = getMessage(); - if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null) + if(autocommit) { - queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties()); + txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); } + int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action); - - if (queues != null && queues.size() != 0) + txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action() { - final List<? extends BaseQueue> rerouteQueues = queues; - ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); - - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() + public void postCommit() { - public void postCommit() - { - try - { - for (BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - - } - }); - - txn.dequeue(currentQueue, message, new ServerTransaction.Action() - { - public void postCommit() - { - delete(); - } + delete(); + } - public void onRollback() - { + public void onRollback() + { - } - }); + } + }); + if(autocommit) + { txn.commit(); } + return enqueues; + + } + else + { + return 0; } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d63d1946d3..87d11a892e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - if(_alternateExchange != null) + + for(final QueueEntry entry : entries) { + // TODO log requeues with a post enqueue action + int requeues = entry.routeToAlternate(null, txn); - for(final QueueEntry entry : entries) + if(requeues == 0) { - - List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); - if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null) - { - queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties()); - } - - final ServerMessage message = entry.getMessage(); - if(queues != null && queues.size() != 0) - { - final List<? extends BaseQueue> rerouteQueues = queues; - txn.enqueue(rerouteQueues, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - - } - }); - txn.dequeue(this, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.delete(); - } - - public void onRollback() - { - } - }); - } - + // TODO log discard } - - _alternateExchange.removeReference(this); } - else - { - // TODO log discard - - for(final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - if(message != null) - { - txn.dequeue(this, message, - new ServerTransaction.Action() - { - public void postCommit() - { - entry.delete(); - } + txn.commit(); - public void onRollback() - { - } - }); - } - } + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); } - txn.commit(); for (Task task : _deleteTaskList) { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 7bd525c90f..764549626a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -312,10 +312,9 @@ public class TopicExchangeTest extends QpidTestCase private int routeMessage(String routingKey, long messageNumber) throws AMQException { - ServerMessage serverMessage = mock(ServerMessage.class); - when(serverMessage.getRoutingKey()).thenReturn(routingKey); - List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY); ServerMessage message = mock(ServerMessage.class); + when(message.getRoutingKey()).thenReturn(routingKey); + List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 2e3231e208..d3c866f747 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.ServerTransaction; public class MockQueueEntry implements QueueEntry { @@ -62,9 +63,9 @@ public class MockQueueEntry implements QueueEntry } - public void routeToAlternate() + public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn) { - + return 0; } public boolean expired() throws AMQException diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index fe82f65115..bae5616042 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -102,6 +104,14 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); + private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry entry) + { + entry.getQueue().checkCapacity(ServerSession.this); + } + }; public static interface MessageDispositionChangeListener { @@ -182,7 +192,9 @@ public class ServerSession extends Session return isCommandsFull(id); } - public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues) + public int enqueue(final MessageTransferMessage message, + final InstanceProperties instanceProperties, + final Exchange exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) @@ -190,10 +202,10 @@ public class ServerSession extends Session _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } + int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; - _transaction.enqueue(queues,message, postTransactionAction); incrementOutstandingTxnsIfNecessary(); + return enqueues; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 973f706e0a..dcca696529 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate } }; - List<? extends BaseQueue> queues = exchange.route(message, instanceProperties); - if(queues.isEmpty() && exchange.getAlternateExchange() != null) - { - final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(message, instanceProperties); - if (!queues.isEmpty()) - { - exchangeInUse = alternateExchange; - } - else - { - exchangeInUse = exchange; - } - } - else - { - exchangeInUse = exchange; - } + int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(!queues.isEmpty()) + if(enqueues != 0) { - serverSession.enqueue(message, queues); storeMessage.flushToStore(); } else @@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); + serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 17d0e5cb64..357b565365 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -59,7 +59,6 @@ import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr void reject(final QueueEntry entry) { entry.setRedelivered(); - entry.routeToAlternate(); + entry.routeToAlternate(null, null); if(entry.isAcquiredBy(this)) { entry.delete(); @@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr protected void sendToDLQOrDiscard(QueueEntry entry) { - final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); - if (alternateExchange != null) + + int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry requeueEntry) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); + + if (requeues == 0) { - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); + final AMQQueue queue = entry.getQueue(); + final Exchange alternateExchange = queue.getAlternateExchange(); - if (destinationQueues == null || destinationQueues.isEmpty()) + if(alternateExchange != null) { - entry.delete(); - - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); } else { - entry.routeToAlternate(); - - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName())); - } + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); } } - else - { - entry.delete(); - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); - } } private boolean isMaxDeliveryLimitReached(QueueEntry entry) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b7dc105cb7..c6d4151628 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); + private final ImmediateAction _immediateAction = new ImmediateAction(); + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { @@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { + final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate(); + final InstanceProperties instanceProperties = new InstanceProperties() { @@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F case EXPIRATION: return amqMessage.getExpiration(); case IMMEDIATE: - return _currentMessage.getMessagePublishInfo().isImmediate(); + return immediate; case PERSISTENT: return amqMessage.isPersistent(); case MANDATORY: @@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - final List<? extends BaseQueue> destinationQueues = - _currentMessage.getExchange().route(amqMessage, instanceProperties); - - if(destinationQueues == null || destinationQueues.isEmpty()) + int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + immediate ? _immediateAction : _capacityCheckAction); + if(enqueues == 0) { handleUnroutableMessage(amqMessage); } else { - _transaction.enqueue(destinationQueues, - amqMessage, - new MessageDeliveryAction(amqMessage, destinationQueues)); incrementOutstandingTxnsIfNecessary(); handle.flushToStore(); - } } } @@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(immediate) { - action = new ImmediateAction(queue); + action = new ImmediateAction(); } else { @@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _reference.release(); } - private class ImmediateAction implements BaseQueue.PostEnqueueAction + + } + private class ImmediateAction implements BaseQueue.PostEnqueueAction + { + + public ImmediateAction() { - private final BaseQueue _queue; + } - public ImmediateAction(BaseQueue queue) - { - _queue = queue; - } + public void onEnqueue(QueueEntry entry) + { + AMQQueue queue = entry.getQueue(); - public void onEnqueue(QueueEntry entry) + if (!entry.getDeliveredToConsumer() && entry.acquire()) { - if (!entry.getDeliveredToConsumer() && entry.acquire()) - { - - ServerTransaction txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); - final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(_queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); + entries.add(entry); + final AMQMessage message = (AMQMessage) entry.getMessage(); + txn.dequeue(queue, entry.getMessage(), + new MessageAcknowledgeAction(entries) + { + @Override + public void postCommit() { - @Override - public void postCommit() + try { - try - { - final - ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); - - outputConverter.writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - IMMEDIATE_DELIVERY_REPLY_TEXT); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - super.postCommit(); + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); } + super.postCommit(); } - ); - txn.commit(); - + } + ); + txn.commit(); - } } + else + { + queue.checkCapacity(AMQChannel.this); + } + + } + } + + private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction + { + @Override + public void onEnqueue(final QueueEntry entry) + { + AMQQueue queue = entry.getQueue(); + queue.checkCapacity(AMQChannel.this); } } @@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); - final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); + final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag); if (rejectedQueueEntry == null) { _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); - return; } else { final ServerMessage msg = rejectedQueueEntry.getMessage(); - final AMQQueue queue = rejectedQueueEntry.getQueue(); - - final Exchange altExchange = queue.getAlternateExchange(); - unackedMap.remove(deliveryTag); + int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry requeueEntry) + { + _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); - if (altExchange == null) + if(requeues == 0) { - _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - rejectedQueueEntry.delete(); - return; - } + final AMQQueue queue = rejectedQueueEntry.getQueue(); + final Exchange altExchange = queue.getAlternateExchange(); - final List<? extends BaseQueue> destinationQueues = - altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties()); - - if (destinationQueues == null || destinationQueues.isEmpty()) - { - _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); - rejectedQueueEntry.delete(); - return; - } - - rejectedQueueEntry.routeToAlternate(); + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName())); + } + else + { + _logger.debug( + "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + + deliveryTag); + _actor.message(_logSubject, + ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + } } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 3b981b46b8..3d030890e0 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Rejected; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; @@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction; public class ExchangeDestination implements ReceivingDestination, SendingDestination { private static final Accepted ACCEPTED = new Accepted(); - private static final Outcome[] OUTCOMES = { ACCEPTED }; + public static final Rejected REJECTED = new Rejected(); + private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; private Exchange _exchange; private TerminusDurability _durability; @@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties); + int enqueues = _exchange.send(message, instanceProperties, txn, null); - if(queues == null || queues.isEmpty()) - { - Exchange altExchange = _exchange.getAlternateExchange(); - if(altExchange != null) - { - queues = altExchange.route(message, instanceProperties); - } - } - - if(queues != null && !queues.isEmpty()) - { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); - - txn.enqueue(queues,message, new ServerTransaction.Action() - { - MessageReference _reference = message.newReference(); - - public void postCommit() - { - for(int i = 0; i < baseQueues.length; i++) - { - try - { - baseQueues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - _reference.release(); - } - - public void onRollback() - { - _reference.release(); - } - }); - } - return ACCEPTED; + return enqueues == 0 ? REJECTED : ACCEPTED; } TerminusDurability getDurability() diff --git a/java/broker-plugins/management-http/pom.xml b/java/broker-plugins/management-http/pom.xml index abc754902a..57b2dd863b 100644 --- a/java/broker-plugins/management-http/pom.xml +++ b/java/broker-plugins/management-http/pom.xml @@ -50,15 +50,15 @@ <dependency> <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-servlet_2.5_spec</artifactId> - <version>1.2</version> + <artifactId>geronimo-servlet_3.0_spec</artifactId> + <version>1.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -79,14 +79,14 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-continuation</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-security</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -99,7 +99,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -112,7 +112,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-io</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -125,7 +125,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-servlet</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -138,14 +138,14 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-websocket</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 039114056f..3375a784ea 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -29,6 +29,7 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import javax.servlet.DispatcherType; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -77,7 +78,6 @@ import org.apache.qpid.server.plugin.PluginFactory; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.DispatcherType; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionManager; import org.eclipse.jetty.server.nio.SelectChannelConnector; @@ -396,7 +396,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem root.addServlet(new ServletHolder(new LogFileServlet()), "/rest/logfile"); final SessionManager sessionManager = root.getSessionHandler().getSessionManager(); - sessionManager.setSessionCookie(JSESSIONID_COOKIE_PREFIX + lastPort); + sessionManager.getSessionCookieConfig().setName(JSESSIONID_COOKIE_PREFIX + lastPort); sessionManager.setMaxInactiveInterval((Integer)getAttribute(TIME_OUT)); return server; diff --git a/java/broker-plugins/websocket/pom.xml b/java/broker-plugins/websocket/pom.xml index 2029bd33aa..fb55be05c8 100644 --- a/java/broker-plugins/websocket/pom.xml +++ b/java/broker-plugins/websocket/pom.xml @@ -38,15 +38,15 @@ <dependency> <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-servlet_2.5_spec</artifactId> - <version>1.2</version> + <artifactId>geronimo-servlet_3.0_spec</artifactId> + <version>1.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -67,14 +67,14 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-continuation</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-security</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -87,7 +87,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -100,7 +100,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-io</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -113,7 +113,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-servlet</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> @@ -126,14 +126,14 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-websocket</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <scope>compile</scope> <exclusions> <exclusion> diff --git a/java/build.deps b/java/build.deps index 58dea7009e..4dc5b0ca46 100644 --- a/java/build.deps +++ b/java/build.deps @@ -35,7 +35,7 @@ geronimo-j2ee=lib/required/geronimo-j2ee-connector_1.5_spec-2.0.0.jar geronimo-jta=lib/required/geronimo-jta_1.1_spec-1.1.1.jar geronimo-kernel=lib/required/geronimo-kernel-2.2.1.jar geronimo-openejb=lib/required/geronimo-ejb_3.0_spec-1.0.1.jar -geronimo-servlet=lib/required/geronimo-servlet_2.5_spec-1.2.jar +geronimo-servlet=lib/required/geronimo-servlet_3.0_spec-1.0.jar junit=lib/required/junit-3.8.1.jar mockito-all=lib/required/mockito-all-1.9.0.jar @@ -49,14 +49,14 @@ slf4j-log4j=lib/required/slf4j-log4j12-1.6.4.jar xalan=lib/required/xalan-2.7.0.jar -jetty=lib/required/jetty-server-7.6.10.v20130312.jar -jetty-continuation=lib/required/jetty-continuation-7.6.10.v20130312.jar -jetty-security=lib/required/jetty-security-7.6.10.v20130312.jar -jetty-util=lib/required/jetty-util-7.6.10.v20130312.jar -jetty-io=lib/required/jetty-io-7.6.10.v20130312.jar -jetty-http=lib/required/jetty-http-7.6.10.v20130312.jar -jetty-servlet=lib/required/jetty-servlet-7.6.10.v20130312.jar -jetty-websocket=lib/required/jetty-websocket-7.6.10.v20130312.jar +jetty=lib/required/jetty-server-8.1.14.v20131031.jar +jetty-continuation=lib/required/jetty-continuation-8.1.14.v20131031.jar +jetty-security=lib/required/jetty-security-8.1.14.v20131031.jar +jetty-util=lib/required/jetty-util-8.1.14.v20131031.jar +jetty-io=lib/required/jetty-io-8.1.14.v20131031.jar +jetty-http=lib/required/jetty-http-8.1.14.v20131031.jar +jetty-servlet=lib/required/jetty-servlet-8.1.14.v20131031.jar +jetty-websocket=lib/required/jetty-websocket-8.1.14.v20131031.jar servlet-api=${geronimo-servlet} dojo-version=1.9.1 diff --git a/java/ivy.retrieve.xml b/java/ivy.retrieve.xml index 388e2d0dc4..59b3fa70af 100644 --- a/java/ivy.retrieve.xml +++ b/java/ivy.retrieve.xml @@ -49,7 +49,7 @@ <dependency org="org.apache.geronimo.specs" name="geronimo-j2ee-connector_1.5_spec" rev="2.0.0" transitive="false"/> <dependency org="org.apache.geronimo.specs" name="geronimo-jms_1.1_spec" rev="1.0" transitive="false"/> <dependency org="org.apache.geronimo.specs" name="geronimo-jta_1.1_spec" rev="1.1.1" transitive="false"/> - <dependency org="org.apache.geronimo.specs" name="geronimo-servlet_2.5_spec" rev="1.2" transitive="false"/> + <dependency org="org.apache.geronimo.specs" name="geronimo-servlet_3.0_spec" rev="1.0" transitive="false"/> <dependency org="com.google.code.gson" name="gson" rev="2.0" transitive="false"/> <dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.9.0" transitive="false"/> <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.9.0" transitive="false"/> @@ -61,14 +61,14 @@ <dependency org="org.mockito" name="mockito-all" rev="1.9.0" transitive="false"/> <dependency org="org.slf4j" name="slf4j-api" rev="1.6.4" transitive="false"/> <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.4" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-server" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-websocket" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-continuation" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-io" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-http" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-security" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="7.6.10.v20130312" transitive="false"/> - <dependency org="org.eclipse.jetty" name="jetty-util" rev="7.6.10.v20130312" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-server" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-websocket" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-continuation" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-io" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-http" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-security" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="8.1.14.v20131031" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-util" rev="8.1.14.v20131031" transitive="false"/> <dependency org="xalan" name="xalan" rev="2.7.0" transitive="false"/> <dependency org="velocity" name="velocity" rev="1.4" transitive="false"/> <dependency org="velocity" name="velocity-dep" rev="1.4" transitive="false"/> diff --git a/java/jca/build.xml b/java/jca/build.xml index 7137467e4b..83cc781ba9 100644 --- a/java/jca/build.xml +++ b/java/jca/build.xml @@ -24,7 +24,7 @@ <property name="module.name" value="jca"/> <property name="module.genpom" value="true"/> - <property name="module.genpom.args" value="-Sgeronimo-j2ee-connector_1.5_spec=provided -Sgeronimo-jta_1.1_spec=provided -Sgeronimo-jms_1.1_spec=provided -Sgeronimo-ejb_3.0_spec=provided -Sgeronimo-servlet_2.5_spec=provided -Sgeronimo-kernel=provided"/> + <property name="module.genpom.args" value="-Sgeronimo-j2ee-connector_1.5_spec=provided -Sgeronimo-jta_1.1_spec=provided -Sgeronimo-jms_1.1_spec=provided -Sgeronimo-ejb_3.0_spec=provided -Sgeronimo-servlet_3.0_spec=provided -Sgeronimo-kernel=provided"/> <import file="../module.xml"/> diff --git a/java/jca/example/build-geronimo-properties.xml b/java/jca/example/build-geronimo-properties.xml index a20753117f..3c84b7634a 100644 --- a/java/jca/example/build-geronimo-properties.xml +++ b/java/jca/example/build-geronimo-properties.xml @@ -87,7 +87,6 @@ <path id="compile.classpath"> <fileset dir="${geronimo.home}/repository/org/apache/geronimo/specs"> <include name="geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar"/> - <include name="geronimo-servlet_2.5_spec/1.2/geronimo-servlet_2.5_spec-1.2.jar"/> <include name="geronimo-ejb_3.0_spec/1.0.1/geronimo-ejb_3.0_spec-1.0.1.jar"/> <include name="geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar"/> @@ -113,7 +112,7 @@ <fileset dir="${geronimo.home}/repository/org/apache/geronimo/specs"> <include name="geronimo-j2ee-connector_1.5_spec/2.0.0/geronimo-j2ee-connector_1.5_spec-2.0.0.jar"/> <include name="geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar"/> - <include name="geronimo-servlet_2.5_spec/1.2/geronimo-servlet_2.5_spec-1.2.jar"/> + <include name="geronimo-servlet_3.0_spec/1.0/geronimo-servlet_3.0_spec-1.0.jar"/> <include name="geronimo-ejb_3.0_spec/1.0.1/geronimo-ejb_3.0_spec-1.0.1.jar"/> <include name="geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar"/> </fileset> diff --git a/java/jca/pom.xml b/java/jca/pom.xml index 859b8aabac..c7a8de61fe 100644 --- a/java/jca/pom.xml +++ b/java/jca/pom.xml @@ -70,8 +70,8 @@ <dependency> <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-servlet_2.5_spec</artifactId> - <version>1.2</version> + <artifactId>geronimo-servlet_3.0_spec</artifactId> + <version>1.0</version> <scope>provided</scope> </dependency> diff --git a/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml b/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml index 11228afcfa..5e7093bb0a 100644 --- a/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml +++ b/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml @@ -17,6 +17,6 @@ --> <dep> <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-servlet_2.5_spec</artifactId> - <version>1.2</version> + <artifactId>geronimo-servlet_3.0_spec</artifactId> + <version>1.0</version> </dep> diff --git a/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml b/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml index 5beba95d17..10b7a4c499 100644 --- a/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml @@ -18,5 +18,5 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-continuation</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> </dep> diff --git a/java/lib/poms/jetty-http-7.6.10.v20130312.xml b/java/lib/poms/jetty-http-8.1.14.v20131031.xml index 5c840bedd6..929fcbef3a 100644 --- a/java/lib/poms/jetty-http-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-http-8.1.14.v20131031.xml @@ -18,7 +18,7 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> diff --git a/java/lib/poms/jetty-io-7.6.10.v20130312.xml b/java/lib/poms/jetty-io-8.1.14.v20131031.xml index 9cec3998ea..42be6ad6ab 100644 --- a/java/lib/poms/jetty-io-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-io-8.1.14.v20131031.xml @@ -18,7 +18,7 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-io</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> diff --git a/java/lib/poms/jetty-security-7.6.10.v20130312.xml b/java/lib/poms/jetty-security-8.1.14.v20131031.xml index 9501750ba0..8079c78d96 100644 --- a/java/lib/poms/jetty-security-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-security-8.1.14.v20131031.xml @@ -18,7 +18,7 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-security</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> diff --git a/java/lib/poms/jetty-server-7.6.10.v20130312.xml b/java/lib/poms/jetty-server-8.1.14.v20131031.xml index 587860b50f..5b8160efd4 100644 --- a/java/lib/poms/jetty-server-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-server-8.1.14.v20131031.xml @@ -18,7 +18,7 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> diff --git a/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml b/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml index 4c0ff0a41b..5abcf03a18 100644 --- a/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml @@ -18,7 +18,7 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-servlet</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> diff --git a/java/lib/poms/jetty-util-7.6.10.v20130312.xml b/java/lib/poms/jetty-util-8.1.14.v20131031.xml index f5c990248f..e134444e44 100644 --- a/java/lib/poms/jetty-util-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-util-8.1.14.v20131031.xml @@ -18,5 +18,5 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> </dep> diff --git a/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml b/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml index 4d3ebd1666..1592ca3d56 100644 --- a/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml +++ b/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml @@ -18,7 +18,7 @@ <dep> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-websocket</artifactId> - <version>7.6.10.v20130312</version> + <version>8.1.14.v20131031</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 861b225e6f..f92a133919 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -624,32 +624,10 @@ public class MessageStoreTest extends QpidTestCase storedMessage.flushToStore(); final AMQMessage currentMessage = new AMQMessage(storedMessage); - final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY); ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - - trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() { - public void postCommit() - { - try - { - for(BaseQueue queue : destinationQueues) - { - queue.enqueue(currentMessage); - } - } - catch (AMQException e) - { - _logger.error("Problem enqueing message", e); - } - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); + exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null); } |