summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-06 01:14:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-06 01:14:39 +0000
commit9174267aac0e56de88cf2b7f0e5bfb11721a7cde (patch)
treed634a98939055fa032e8f1285d72686fe6ef98eb
parent19d0f7296faa5b57c19a1e17a1925be6f0896d15 (diff)
downloadqpid-python-9174267aac0e56de88cf2b7f0e5bfb11721a7cde.tar.gz
remove enqueue(..) from MessageSource
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565033 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java4
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java2
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java59
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java16
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java6
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java2
10 files changed, 74 insertions, 25 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index 1ef06728f8..06ff76f103 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -54,8 +54,6 @@ public interface MessageSource<C extends Consumer> extends TransactionLogResourc
boolean isExclusive();
- void enqueue(ServerMessage message) throws AMQException;
-
interface ConsumerRegistrationListener
{
void consumerAdded(AMQQueue queue, Consumer consumer);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 13fe4e6b72..f2ce20c74d 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -789,7 +789,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
try
{
- toQueue.enqueue(message);
+ toQueue.enqueue(message, null);
}
catch(AMQException e)
{
@@ -818,7 +818,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
try
{
- toQueue.enqueue(message);
+ toQueue.enqueue(message, null);
}
catch (AMQException e)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 8fe9d31e3e..65007df2d9 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -631,10 +631,6 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message) throws AMQException
- {
- enqueue(message, null);
- }
public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index f82490ee79..6e36cdfa94 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -292,7 +292,7 @@ public class VirtualHostConfigRecoveryHandler implements
count = 0;
}
- queue.enqueue(message);
+ queue.enqueue(message,null);
_queueRecoveries.put(queueName, ++count);
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f2b53f95c3..0c350c838e 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -1214,7 +1214,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Put message on queue
try
{
- queue.enqueue(message);
+ queue.enqueue(message,null);
}
catch (AMQException e)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index d9352f34f7..c805956b83 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -145,7 +145,7 @@ public class AckTest extends QpidTestCase
try
{
- _queue.enqueue(message);
+ _queue.enqueue(message,null);
}
catch (AMQException e)
{
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
new file mode 100644
index 0000000000..6f37d2d831
--- /dev/null
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class MessageSourceDestination implements SendingDestination
+{
+ private static final Logger _logger = Logger.getLogger(MessageSourceDestination.class);
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
+
+
+ private MessageSource _queue;
+
+ public MessageSourceDestination(MessageSource queue)
+ {
+ _queue = queue;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 100;
+ }
+
+ public MessageSource getQueue()
+ {
+ return _queue;
+ }
+
+}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index c2d124b427..3d6bb5e3db 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -29,18 +29,16 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.ServerTransaction;
-public class QueueDestination implements SendingDestination, ReceivingDestination
+public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
{
private static final Logger _logger = Logger.getLogger(QueueDestination.class);
private static final Accepted ACCEPTED = new Accepted();
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
- private MessageSource _queue;
-
- public QueueDestination(MessageSource queue)
+ public QueueDestination(AMQQueue queue)
{
- _queue = queue;
+ super(queue);
}
public Outcome[] getOutcomes()
@@ -53,7 +51,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
try
{
- txn.enqueue(_queue,message, new ServerTransaction.Action()
+ txn.enqueue(getQueue(),message, new ServerTransaction.Action()
{
@@ -61,7 +59,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
{
try
{
- _queue.enqueue(message);
+ getQueue().enqueue(message,null);
}
catch (Exception e)
{
@@ -91,9 +89,9 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
return 100;
}
- public MessageSource getQueue()
+ public AMQQueue getQueue()
{
- return _queue;
+ return (AMQQueue) super.getQueue();
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index f7867d6178..9e0327fe76 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -118,17 +118,15 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
boolean noLocal = false;
JMSSelectorFilter messageFilter = null;
- if(destination instanceof QueueDestination)
+ if(destination instanceof MessageSourceDestination)
{
- _queue = ((QueueDestination) _destination).getQueue();
+ _queue = ((MessageSourceDestination) _destination).getQueue();
if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
- qd = (QueueDestination) destination;
-
Map<Symbol,Filter> filters = source.getFilter();
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c7508fa913..beed6be84b 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -115,7 +115,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
if(queue != null)
{
- destination = new QueueDestination(queue);
+ destination = new MessageSourceDestination(queue);