summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-07-20 22:18:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-07-20 22:18:21 +0000
commit31edffa7eebca19569716e4d08857109ea6a3a02 (patch)
tree487f7fefdb72ccefe6e01a2f1b42fef80466323a
parent65662926c2fba7235ae937c3392ca1afbd04e044 (diff)
downloadqpid-python-31edffa7eebca19569716e4d08857109ea6a3a02.tar.gz
Updated to get basic subscription functionality
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@796044 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java361
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java49
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
19 files changed, 544 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
index bfed4f4c60..8a9d547b55 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
@@ -33,6 +33,16 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
_bytesCredit = new AtomicLong(initialCredit);
}
+ public long getMessageCredit()
+ {
+ return -1L;
+ }
+
+ public long getBytesCredit()
+ {
+ return _bytesCredit.get();
+ }
+
public void addCredit(long messageCredit, long bytesCredit)
{
_bytesCredit.addAndGet(bytesCredit);
@@ -71,4 +81,9 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
}
}
+
+ public void setBytesCredit(long bytesCredit)
+ {
+ _bytesCredit.set( bytesCredit );
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index 089a34dde3..96e86da54a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -25,6 +25,9 @@ import org.apache.qpid.server.message.ServerMessage;
*/
public interface FlowCreditManager
{
+ long getMessageCredit();
+
+ long getBytesCredit();
public static interface FlowCreditManagerListener
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
index f3f36700d8..3802dcf0f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
@@ -24,6 +24,16 @@ import org.apache.qpid.server.message.ServerMessage;
*/
public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
+ public long getMessageCredit()
+ {
+ return -1L;
+ }
+
+ public long getBytesCredit()
+ {
+ return -1L;
+ }
+
public void addCredit(long messageCredit, long bytesCredit)
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
index 706bdcae61..dcbb37c153 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
@@ -27,14 +27,24 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
private long _messageCredit;
private long _bytesCredit;
- MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
+ public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
{
_messageCredit = messageCredit;
_bytesCredit = bytesCredit;
}
- public synchronized void addCredit(long messageCredit, long bytesCredit)
+ public synchronized long getMessageCredit()
+ {
+ return _messageCredit;
+ }
+
+ public synchronized long getBytesCredit()
{
+ return _bytesCredit;
+ }
+
+ public synchronized void addCredit(long messageCredit, long bytesCredit)
+ {
_messageCredit += messageCredit;
_bytesCredit += bytesCredit;
setSuspended(hasCredit());
@@ -74,4 +84,9 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
}
}
+
+ public synchronized void setBytesCredit(long bytesCredit)
+ {
+ _bytesCredit = bytesCredit;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
index abbc91a1ee..3c84af2228 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
@@ -33,10 +33,21 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
_messageCredit = new AtomicLong(initialCredit);
}
+ public long getMessageCredit()
+ {
+ return _messageCredit.get();
+ }
+
+ public long getBytesCredit()
+ {
+ return -1L;
+ }
+
public void addCredit(long messageCredit, long bytesCredit)
{
- setSuspended(false);
_messageCredit.addAndGet(messageCredit);
+ setSuspended(false);
+
}
public void removeAllCredit()
@@ -73,4 +84,5 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
}
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
index 17710df3ee..77bbc82a14 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
@@ -81,6 +81,16 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
+ public long getMessageCredit()
+ {
+ return _messageCredit;
+ }
+
+ public long getBytesCredit()
+ {
+ return _bytesCredit;
+ }
+
public synchronized void addCredit(final long messageCredit, final long bytesCredit)
{
final long messageCreditLimit = _messageCreditLimit;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index 38b647bfd1..5b08b9fb52 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.message;
import org.apache.qpid.transport.*;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
public class MessageTransferMessage implements InboundMessage, ServerMessage
@@ -96,4 +97,17 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
{
return _arrivalTime;
}
+
+ public Header getHeader()
+ {
+ return _xfr.getHeader();
+
+ }
+
+ public ByteBuffer getBody()
+ {
+ return _xfr.getBody();
+ }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 1deb465127..589f6919d5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -160,6 +160,8 @@ public interface QueueEntry extends Comparable<QueueEntry>
void setRedelivered(boolean b);
+ boolean isRedelivered();
+
Subscription getDeliveredSubscription();
void reject();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 4cb07c3006..d69f4271d9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -199,6 +199,11 @@ public class QueueEntryImpl implements QueueEntry
_redelivered = b;
}
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
public Subscription getDeliveredSubscription()
{
EntryState state = _state;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a96a6d624a..485c8bd96a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1214,8 +1214,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
unregisterSubscription(sub);
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+ sub.confirmAutoClose();
+
}
else if (!atTail)
{
@@ -1396,8 +1396,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
unregisterSubscription(sub);
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+ sub.confirmAutoClose();
}
}
else
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 9419572399..cc6b00609a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -30,6 +30,7 @@ public interface Subscription
{
+
public static enum State
{
ACTIVE,
@@ -48,8 +49,6 @@ public interface Subscription
void setQueue(AMQQueue queue);
- AMQChannel getChannel();
-
AMQShortString getConsumerTag();
boolean isSuspended();
@@ -64,8 +63,6 @@ public interface Subscription
void close();
- boolean filtersMessages();
-
void send(QueueEntry msg) throws AMQException;
void queueDeleted(AMQQueue queue);
@@ -74,9 +71,8 @@ public interface Subscription
boolean wouldSuspend(QueueEntry msg);
void getSendLock();
- void releaseSendLock();
- void resend(final QueueEntry entry) throws AMQException;
+ void releaseSendLock();
void restoreCredit(final QueueEntry queueEntry);
@@ -91,6 +87,7 @@ public interface Subscription
boolean isActive();
+ void confirmAutoClose();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index dc69e21731..5bb746e55f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -32,6 +32,7 @@ import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -374,6 +375,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean hasInterest(QueueEntry entry)
{
+
+ // TODO 0-10 to 0-8 conversion
+ if(!(entry.getMessage() instanceof AMQMessage))
+ {
+ return false;
+ }
+
+
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
{
@@ -516,11 +525,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_stateChangeLock.unlock();
}
- public void resend(final QueueEntry entry) throws AMQException
- {
- _queue.resend(entry, this);
- }
-
public AMQChannel getChannel()
{
return _channel;
@@ -617,4 +621,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _owningState;
}
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
new file mode 100644
index 0000000000..bdb3bf1b86
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.*;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+
+import sun.awt.X11.XSystemTrayPeer;
+
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
+{
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final Lock _stateChangeLock = new ReentrantLock();
+
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+ private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+ private FlowCreditManager _creditManager;
+
+
+ private StateListener _stateListener = new StateListener()
+ {
+
+ public void stateChange(Subscription sub, State oldState, State newState)
+ {
+
+ }
+ };
+ private AMQQueue _queue;
+ private final String _destination;
+ private boolean _noLocal;
+ private final FilterManager _filters;
+ private final MessageAcceptMode _acceptMode;
+ private final MessageAcquireMode _acquireMode;
+ private final ServerSession _session;
+
+
+ public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode, FlowCreditManager creditManager, FilterManager filters)
+ {
+ _session = session;
+ _destination = destination;
+ _acceptMode = acceptMode;
+ _acquireMode = acquireMode;
+ _creditManager = creditManager;
+ _filters = filters;
+ _creditManager.addStateListener(this);
+
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+ public void setQueue(AMQQueue queue)
+ {
+ if(getQueue() != null)
+ {
+ throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+ }
+ _queue = queue;
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return new AMQShortString(_destination);
+ }
+
+ public boolean isSuspended()
+ {
+ return !isActive() || _deleted.get(); // TODO check for Session suspension
+ }
+
+ public boolean hasInterest(QueueEntry entry)
+ {
+
+ //TODO 0-8/9 to 0-10 conversion
+ if(!(entry.getMessage() instanceof MessageTransferMessage))
+ {
+ return false;
+ }
+
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(this))
+ {
+
+ return false;
+ }
+
+
+
+ if (_noLocal)
+ {
+
+
+ }
+
+
+ return checkFilters(entry);
+
+
+ }
+
+ private boolean checkFilters(QueueEntry entry)
+ {
+ return (_filters == null) || _filters.allAllow(entry.getMessage());
+ }
+
+ public boolean isAutoClose()
+ {
+ // no such thing in 0-10
+ return false;
+ }
+
+ public boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ _stateChangeLock.lock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = _state.compareAndSet(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ else
+ {
+ _stateListener.stateChange(this,state, State.CLOSED);
+ }
+ }
+ _creditManager.removeListener(this);
+ }
+ finally
+ {
+ _stateChangeLock.unlock();
+ }
+
+
+
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ else
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+ }
+
+
+ public void send(QueueEntry entry) throws AMQException
+ {
+ ServerMessage serverMsg = entry.getMessage();
+
+
+ MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+
+
+ MessageTransfer xfr = new MessageTransfer();
+ xfr.setDestination(_destination);
+ xfr.setBody(msg.getBody());
+ xfr.setAcceptMode(_acceptMode);
+ xfr.setAcquireMode(_acquireMode);
+
+ Struct[] headers = msg.getHeader().getStructs();
+
+ ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
+ DeliveryProperties origDeliveryProps = null;
+ for(Struct header : headers)
+ {
+ if(header instanceof DeliveryProperties)
+ {
+ origDeliveryProps = (DeliveryProperties) header;
+ }
+ else
+ {
+ newHeaders.add(header);
+ }
+ }
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+
+ }
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ newHeaders.add(deliveryProps);
+ xfr.setHeader(new Header(newHeaders));
+
+
+ _session.sendMessage(xfr);
+
+
+ }
+
+ public void queueDeleted(AMQQueue queue)
+ {
+ _deleted.set(true);
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !_creditManager.useCreditForMessage(msg.getMessage());
+ }
+
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ _creditManager.addCredit(1, queueEntry.getSize());
+ }
+
+ public void setStateListener(StateListener listener)
+ {
+ _stateListener = listener;
+ }
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ public QueueEntry getLastSeenEntry()
+ {
+ return _queueContext.get();
+ }
+
+ public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ {
+ return _queueContext.compareAndSet(expected, newValue);
+ }
+
+ public boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public void confirmAutoClose()
+ {
+ //No such thing in 0-10
+ }
+
+
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+
+ public void setCreditManager(FlowCreditManager creditManager)
+ {
+ _creditManager.removeListener(this);
+
+ _creditManager = creditManager;
+
+ creditManager.addStateListener(this);
+
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 0271949101..d8ec0d881e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -57,4 +57,9 @@ public class ServerSession extends Session
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
+
+ public void sendMessage(MessageTransfer xfr)
+ {
+ invoke(xfr);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index e9cf1b6474..3f88578084 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -30,13 +30,19 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.flow.*;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
public class ServerSessionDelegate extends SessionDelegate
{
private final IApplicationRegistry _appRegistry;
+ private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>();
public ServerSessionDelegate(IApplicationRegistry appRegistry)
{
@@ -76,7 +82,31 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageSubscribe(Session session, MessageSubscribe method)
{
- super.messageSubscribe(session, method);
+ String destination = method.getDestination();
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+
+ //TODO null check
+
+ FlowCreditManager creditManager = new MessageOnlyCreditManager(0L);
+
+ // TODO filters
+
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+
+ _subscriptions.put(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
}
@@ -399,4 +429,21 @@ public class ServerSessionDelegate extends SessionDelegate
{
super.queueQuery(session, method);
}
+
+
+ @Override
+ public void messageFlow(Session ssn, MessageFlow flow)
+ {
+ String destination = flow.getDestination();
+
+ Subscription_0_10 sub = _subscriptions.get(destination);
+
+ FlowCreditManager creditManager = sub.getCreditManager();
+
+ if(flow.getUnit() == MessageCreditUnit.MESSAGE)
+ {
+ creditManager.addCredit(flow.getValue(), 0L);
+ }
+
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6634eb3e60..db0ec1c4fa 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -338,6 +338,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isRedelivered()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public Subscription getDeliveredSubscription()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 850e241cbd..08b4573f33 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -177,7 +177,12 @@ public class MockQueueEntry implements QueueEntry
}
-
+ public boolean isRedelivered()
+ {
+ return false;
+ }
+
+
public int compareTo(QueueEntry o)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 33fd669d5c..c8ca126136 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -99,6 +99,11 @@ public class MockSubscription implements Subscription
return true;
}
+ public void confirmAutoClose()
+ {
+
+ }
+
public boolean isAutoClose()
{
return false;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 66ec9686dd..73cb8cd2cd 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -146,6 +146,11 @@ public class SubscriptionTestHelper implements Subscription
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void confirmAutoClose()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public AMQQueue getQueue()
{
return null;