diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 22:18:21 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 22:18:21 +0000 |
commit | 31edffa7eebca19569716e4d08857109ea6a3a02 (patch) | |
tree | 487f7fefdb72ccefe6e01a2f1b42fef80466323a | |
parent | 65662926c2fba7235ae937c3392ca1afbd04e044 (diff) | |
download | qpid-python-31edffa7eebca19569716e4d08857109ea6a3a02.tar.gz |
Updated to get basic subscription functionality
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@796044 13f79535-47bb-0310-9956-ffa450edef68
19 files changed, 544 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index bfed4f4c60..8a9d547b55 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -33,6 +33,16 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager _bytesCredit = new AtomicLong(initialCredit); } + public long getMessageCredit() + { + return -1L; + } + + public long getBytesCredit() + { + return _bytesCredit.get(); + } + public void addCredit(long messageCredit, long bytesCredit) { _bytesCredit.addAndGet(bytesCredit); @@ -71,4 +81,9 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager } } + + public void setBytesCredit(long bytesCredit) + { + _bytesCredit.set( bytesCredit ); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index 089a34dde3..96e86da54a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -25,6 +25,9 @@ import org.apache.qpid.server.message.ServerMessage; */ public interface FlowCreditManager { + long getMessageCredit(); + + long getBytesCredit(); public static interface FlowCreditManagerListener { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index f3f36700d8..3802dcf0f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -24,6 +24,16 @@ import org.apache.qpid.server.message.ServerMessage; */ public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager { + public long getMessageCredit() + { + return -1L; + } + + public long getBytesCredit() + { + return -1L; + } + public void addCredit(long messageCredit, long bytesCredit) { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index 706bdcae61..dcbb37c153 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -27,14 +27,24 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl private long _messageCredit; private long _bytesCredit; - MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit) + public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit) { _messageCredit = messageCredit; _bytesCredit = bytesCredit; } - public synchronized void addCredit(long messageCredit, long bytesCredit) + public synchronized long getMessageCredit() + { + return _messageCredit; + } + + public synchronized long getBytesCredit() { + return _bytesCredit; + } + + public synchronized void addCredit(long messageCredit, long bytesCredit) + { _messageCredit += messageCredit; _bytesCredit += bytesCredit; setSuspended(hasCredit()); @@ -74,4 +84,9 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl } } + + public synchronized void setBytesCredit(long bytesCredit) + { + _bytesCredit = bytesCredit; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index abbc91a1ee..3c84af2228 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -33,10 +33,21 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen _messageCredit = new AtomicLong(initialCredit); } + public long getMessageCredit() + { + return _messageCredit.get(); + } + + public long getBytesCredit() + { + return -1L; + } + public void addCredit(long messageCredit, long bytesCredit) { - setSuspended(false); _messageCredit.addAndGet(messageCredit); + setSuspended(false); + } public void removeAllCredit() @@ -73,4 +84,5 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen } } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index 17710df3ee..77bbc82a14 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -81,6 +81,16 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } + public long getMessageCredit() + { + return _messageCredit; + } + + public long getBytesCredit() + { + return _bytesCredit; + } + public synchronized void addCredit(final long messageCredit, final long bytesCredit) { final long messageCreditLimit = _messageCreditLimit; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index 38b647bfd1..5b08b9fb52 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.message; import org.apache.qpid.transport.*; import java.util.concurrent.atomic.AtomicLong; +import java.nio.ByteBuffer; public class MessageTransferMessage implements InboundMessage, ServerMessage @@ -96,4 +97,17 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage { return _arrivalTime; } + + public Header getHeader() + { + return _xfr.getHeader(); + + } + + public ByteBuffer getBody() + { + return _xfr.getBody(); + } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 1deb465127..589f6919d5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -160,6 +160,8 @@ public interface QueueEntry extends Comparable<QueueEntry> void setRedelivered(boolean b); + boolean isRedelivered(); + Subscription getDeliveredSubscription(); void reject(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 4cb07c3006..d69f4271d9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -199,6 +199,11 @@ public class QueueEntryImpl implements QueueEntry _redelivered = b; } + public boolean isRedelivered() + { + return _redelivered; + } + public Subscription getDeliveredSubscription() { EntryState state = _state; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a96a6d624a..485c8bd96a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1214,8 +1214,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { unregisterSubscription(sub); - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + sub.confirmAutoClose(); + } else if (!atTail) { @@ -1396,8 +1396,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { unregisterSubscription(sub); - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + sub.confirmAutoClose(); } } else diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 9419572399..cc6b00609a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -30,6 +30,7 @@ public interface Subscription { + public static enum State { ACTIVE, @@ -48,8 +49,6 @@ public interface Subscription void setQueue(AMQQueue queue); - AMQChannel getChannel(); - AMQShortString getConsumerTag(); boolean isSuspended(); @@ -64,8 +63,6 @@ public interface Subscription void close(); - boolean filtersMessages(); - void send(QueueEntry msg) throws AMQException; void queueDeleted(AMQQueue queue); @@ -74,9 +71,8 @@ public interface Subscription boolean wouldSuspend(QueueEntry msg); void getSendLock(); - void releaseSendLock(); - void resend(final QueueEntry entry) throws AMQException; + void releaseSendLock(); void restoreCredit(final QueueEntry queueEntry); @@ -91,6 +87,7 @@ public interface Subscription boolean isActive(); + void confirmAutoClose(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index dc69e21731..5bb746e55f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -32,6 +32,7 @@ import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; @@ -374,6 +375,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean hasInterest(QueueEntry entry) { + + // TODO 0-10 to 0-8 conversion + if(!(entry.getMessage() instanceof AMQMessage)) + { + return false; + } + + //check that the message hasn't been rejected if (entry.isRejectedBy(this)) { @@ -516,11 +525,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _stateChangeLock.unlock(); } - public void resend(final QueueEntry entry) throws AMQException - { - _queue.resend(entry, this); - } - public AMQChannel getChannel() { return _channel; @@ -617,4 +621,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _owningState; } + public void confirmAutoClose() + { + ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java new file mode 100644 index 0000000000..bdb3bf1b86 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -0,0 +1,361 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.transport.ServerSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.AMQException; +import org.apache.qpid.transport.*; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; + +import sun.awt.X11.XSystemTrayPeer; + +public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener +{ + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final Lock _stateChangeLock = new ReentrantLock(); + + private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); + private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + private FlowCreditManager _creditManager; + + + private StateListener _stateListener = new StateListener() + { + + public void stateChange(Subscription sub, State oldState, State newState) + { + + } + }; + private AMQQueue _queue; + private final String _destination; + private boolean _noLocal; + private final FilterManager _filters; + private final MessageAcceptMode _acceptMode; + private final MessageAcquireMode _acquireMode; + private final ServerSession _session; + + + public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, FlowCreditManager creditManager, FilterManager filters) + { + _session = session; + _destination = destination; + _acceptMode = acceptMode; + _acquireMode = acquireMode; + _creditManager = creditManager; + _filters = filters; + _creditManager.addStateListener(this); + + } + + public AMQQueue getQueue() + { + return _queue; + } + + public QueueEntry.SubscriptionAcquiredState getOwningState() + { + return _owningState; + } + + public void setQueue(AMQQueue queue) + { + if(getQueue() != null) + { + throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); + } + _queue = queue; + } + + public AMQShortString getConsumerTag() + { + return new AMQShortString(_destination); + } + + public boolean isSuspended() + { + return !isActive() || _deleted.get(); // TODO check for Session suspension + } + + public boolean hasInterest(QueueEntry entry) + { + + //TODO 0-8/9 to 0-10 conversion + if(!(entry.getMessage() instanceof MessageTransferMessage)) + { + return false; + } + + //check that the message hasn't been rejected + if (entry.isRejectedBy(this)) + { + + return false; + } + + + + if (_noLocal) + { + + + } + + + return checkFilters(entry); + + + } + + private boolean checkFilters(QueueEntry entry) + { + return (_filters == null) || _filters.allAllow(entry.getMessage()); + } + + public boolean isAutoClose() + { + // no such thing in 0-10 + return false; + } + + public boolean isClosed() + { + return getState() == State.CLOSED; + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void close() + { + boolean closed = false; + State state = getState(); + + _stateChangeLock.lock(); + try + { + while(!closed && state != State.CLOSED) + { + closed = _state.compareAndSet(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + else + { + _stateListener.stateChange(this,state, State.CLOSED); + } + } + _creditManager.removeListener(this); + } + finally + { + _stateChangeLock.unlock(); + } + + + + } + + public void creditStateChanged(boolean hasCredit) + { + + if(hasCredit) + { + if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + { + _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + } + else + { + // this is a hack to get round the issue of increasing bytes credit + _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); + } + } + else + { + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + } + } + + + public void send(QueueEntry entry) throws AMQException + { + ServerMessage serverMsg = entry.getMessage(); + + + MessageTransferMessage msg = (MessageTransferMessage) serverMsg; + + + + MessageTransfer xfr = new MessageTransfer(); + xfr.setDestination(_destination); + xfr.setBody(msg.getBody()); + xfr.setAcceptMode(_acceptMode); + xfr.setAcquireMode(_acquireMode); + + Struct[] headers = msg.getHeader().getStructs(); + + ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length); + DeliveryProperties origDeliveryProps = null; + for(Struct header : headers) + { + if(header instanceof DeliveryProperties) + { + origDeliveryProps = (DeliveryProperties) header; + } + else + { + newHeaders.add(header); + } + } + + DeliveryProperties deliveryProps = new DeliveryProperties(); + if(origDeliveryProps != null) + { + if(origDeliveryProps.hasDeliveryMode()) + { + deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); + } + if(origDeliveryProps.hasExchange()) + { + deliveryProps.setExchange(origDeliveryProps.getExchange()); + } + if(origDeliveryProps.hasExpiration()) + { + deliveryProps.setExpiration(origDeliveryProps.getExpiration()); + } + if(origDeliveryProps.hasPriority()) + { + deliveryProps.setPriority(origDeliveryProps.getPriority()); + } + if(origDeliveryProps.hasRoutingKey()) + { + deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); + } + + } + + deliveryProps.setRedelivered(entry.isRedelivered()); + + newHeaders.add(deliveryProps); + xfr.setHeader(new Header(newHeaders)); + + + _session.sendMessage(xfr); + + + } + + public void queueDeleted(AMQQueue queue) + { + _deleted.set(true); + } + + public boolean wouldSuspend(QueueEntry msg) + { + return !_creditManager.useCreditForMessage(msg.getMessage()); + } + + public void getSendLock() + { + _stateChangeLock.lock(); + } + + public void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + public void restoreCredit(QueueEntry queueEntry) + { + _creditManager.addCredit(1, queueEntry.getSize()); + } + + public void setStateListener(StateListener listener) + { + _stateListener = listener; + } + + public State getState() + { + return _state.get(); + } + + public QueueEntry getLastSeenEntry() + { + return _queueContext.get(); + } + + public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) + { + return _queueContext.compareAndSet(expected, newValue); + } + + public boolean isActive() + { + return getState() == State.ACTIVE; + } + + public void confirmAutoClose() + { + //No such thing in 0-10 + } + + + public FlowCreditManager getCreditManager() + { + return _creditManager; + } + + public void setCreditManager(FlowCreditManager creditManager) + { + _creditManager.removeListener(this); + + _creditManager = creditManager; + + creditManager.addStateListener(this); + + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 0271949101..d8ec0d881e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -57,4 +57,9 @@ public class ServerSession extends Session e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } + + public void sendMessage(MessageTransfer xfr) + { + invoke(xfr); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index e9cf1b6474..3f88578084 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -30,13 +30,19 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.flow.*; import org.apache.qpid.AMQException; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.nio.ByteBuffer; public class ServerSessionDelegate extends SessionDelegate { private final IApplicationRegistry _appRegistry; + private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>(); public ServerSessionDelegate(IApplicationRegistry appRegistry) { @@ -76,7 +82,31 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageSubscribe(Session session, MessageSubscribe method) { - super.messageSubscribe(session, method); + String destination = method.getDestination(); + String queueName = method.getQueue(); + QueueRegistry queueRegistry = getQueueRegistry(session); + + AMQQueue queue = queueRegistry.getQueue(queueName); + + //TODO null check + + FlowCreditManager creditManager = new MessageOnlyCreditManager(0L); + + // TODO filters + + Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null); + + _subscriptions.put(destination, sub); + try + { + queue.registerSubscription(sub, method.getExclusive()); + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } @@ -399,4 +429,21 @@ public class ServerSessionDelegate extends SessionDelegate { super.queueQuery(session, method); } + + + @Override + public void messageFlow(Session ssn, MessageFlow flow) + { + String destination = flow.getDestination(); + + Subscription_0_10 sub = _subscriptions.get(destination); + + FlowCreditManager creditManager = sub.getCreditManager(); + + if(flow.getUnit() == MessageCreditUnit.MESSAGE) + { + creditManager.addCredit(flow.getValue(), 0L); + } + + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6634eb3e60..db0ec1c4fa 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -338,6 +338,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } + public boolean isRedelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public Subscription getDeliveredSubscription() { return null; //To change body of implemented methods use File | Settings | File Templates. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 850e241cbd..08b4573f33 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -177,7 +177,12 @@ public class MockQueueEntry implements QueueEntry } - + public boolean isRedelivered() + { + return false; + } + + public int compareTo(QueueEntry o) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 33fd669d5c..c8ca126136 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -99,6 +99,11 @@ public class MockSubscription implements Subscription return true; } + public void confirmAutoClose() + { + + } + public boolean isAutoClose() { return false; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 66ec9686dd..73cb8cd2cd 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -146,6 +146,11 @@ public class SubscriptionTestHelper implements Subscription return false; //To change body of implemented methods use File | Settings | File Templates. } + public void confirmAutoClose() + { + //To change body of implemented methods use File | Settings | File Templates. + } + public AMQQueue getQueue() { return null; |