From 9174267aac0e56de88cf2b7f0e5bfb11721a7cde Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 6 Feb 2014 01:14:39 +0000 Subject: remove enqueue(..) from MessageSource git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565033 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/message/MessageSource.java | 2 - .../server/model/adapter/VirtualHostAdapter.java | 4 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 4 -- .../VirtualHostConfigRecoveryHandler.java | 2 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 2 +- .../apache/qpid/server/protocol/v0_8/AckTest.java | 2 +- .../protocol/v1_0/MessageSourceDestination.java | 59 ++++++++++++++++++++++ .../server/protocol/v1_0/QueueDestination.java | 16 +++--- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 6 +-- .../qpid/server/protocol/v1_0/Session_1_0.java | 2 +- 10 files changed, 74 insertions(+), 25 deletions(-) create mode 100644 java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index 1ef06728f8..06ff76f103 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -54,8 +54,6 @@ public interface MessageSource extends TransactionLogResourc boolean isExclusive(); - void enqueue(ServerMessage message) throws AMQException; - interface ConsumerRegistrationListener { void consumerAdded(AMQQueue queue, Consumer consumer); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 13fe4e6b72..f2ce20c74d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -789,7 +789,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { try { - toQueue.enqueue(message); + toQueue.enqueue(message, null); } catch(AMQException e) { @@ -818,7 +818,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { try { - toQueue.enqueue(message); + toQueue.enqueue(message, null); } catch (AMQException e) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 8fe9d31e3e..65007df2d9 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -631,10 +631,6 @@ public class SimpleAMQQueue implements AMQQueue, } // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message) throws AMQException - { - enqueue(message, null); - } public void enqueue(ServerMessage message, Action> action) throws AMQException { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index f82490ee79..6e36cdfa94 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -292,7 +292,7 @@ public class VirtualHostConfigRecoveryHandler implements count = 0; } - queue.enqueue(message); + queue.enqueue(message,null); _queueRecoveries.put(queueName, ++count); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f2b53f95c3..0c350c838e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -1214,7 +1214,7 @@ public class SimpleAMQQueueTest extends QpidTestCase // Put message on queue try { - queue.enqueue(message); + queue.enqueue(message,null); } catch (AMQException e) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index d9352f34f7..c805956b83 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -145,7 +145,7 @@ public class AckTest extends QpidTestCase try { - _queue.enqueue(message); + _queue.enqueue(message,null); } catch (AMQException e) { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java new file mode 100644 index 0000000000..6f37d2d831 --- /dev/null +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.txn.ServerTransaction; + +public class MessageSourceDestination implements SendingDestination +{ + private static final Logger _logger = Logger.getLogger(MessageSourceDestination.class); + private static final Accepted ACCEPTED = new Accepted(); + private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; + + + private MessageSource _queue; + + public MessageSourceDestination(MessageSource queue) + { + _queue = queue; + } + + public Outcome[] getOutcomes() + { + return OUTCOMES; + } + + public int getCredit() + { + // TODO - fix + return 100; + } + + public MessageSource getQueue() + { + return _queue; + } + +} diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index c2d124b427..3d6bb5e3db 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -29,18 +29,16 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.ServerTransaction; -public class QueueDestination implements SendingDestination, ReceivingDestination +public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination { private static final Logger _logger = Logger.getLogger(QueueDestination.class); private static final Accepted ACCEPTED = new Accepted(); private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; - private MessageSource _queue; - - public QueueDestination(MessageSource queue) + public QueueDestination(AMQQueue queue) { - _queue = queue; + super(queue); } public Outcome[] getOutcomes() @@ -53,7 +51,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio try { - txn.enqueue(_queue,message, new ServerTransaction.Action() + txn.enqueue(getQueue(),message, new ServerTransaction.Action() { @@ -61,7 +59,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio { try { - _queue.enqueue(message); + getQueue().enqueue(message,null); } catch (Exception e) { @@ -91,9 +89,9 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio return 100; } - public MessageSource getQueue() + public AMQQueue getQueue() { - return _queue; + return (AMQQueue) super.getQueue(); } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index f7867d6178..9e0327fe76 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -118,17 +118,15 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS boolean noLocal = false; JMSSelectorFilter messageFilter = null; - if(destination instanceof QueueDestination) + if(destination instanceof MessageSourceDestination) { - _queue = ((QueueDestination) _destination).getQueue(); + _queue = ((MessageSourceDestination) _destination).getQueue(); if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } - qd = (QueueDestination) destination; - Map filters = source.getFilter(); Map actualFilters = new HashMap(); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index c7508fa913..beed6be84b 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -115,7 +115,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu if(queue != null) { - destination = new QueueDestination(queue); + destination = new MessageSourceDestination(queue); -- cgit v1.2.1