diff options
Diffstat (limited to 'qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java')
-rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java | 912 |
1 files changed, 456 insertions, 456 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java index 5fbca0b695..e5019f9479 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java @@ -1,456 +1,456 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-
-import java.util.*;
-
-public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
-{
-
-
- private UnsignedInteger _lastDeliveryId;
-
- private static class TransientState
- {
-
- UnsignedInteger _deliveryId;
- int _credit = 1;
- boolean _settled;
-
- private TransientState(final UnsignedInteger transferId)
- {
- _deliveryId = transferId;
- }
-
- void incrementCredit()
- {
- _credit++;
- }
-
- public int getCredit()
- {
- return _credit;
- }
-
- public UnsignedInteger getDeliveryId()
- {
- return _deliveryId;
- }
-
- public boolean isSettled()
- {
- return _settled;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
- }
-
- private Map<Binary, Object> _unsettledMap = new LinkedHashMap<Binary, Object>();
- private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<Binary, TransientState>();
- private boolean _creditWindow;
- private boolean _remoteDrain;
- private UnsignedInteger _remoteTransferCount;
- private UnsignedInteger _drainLimit;
-
-
- public ReceivingLinkEndpoint(final SessionEndpoint session, String name)
- {
- this(session,name,null);
- }
-
- public ReceivingLinkEndpoint(final SessionEndpoint session, String name, Map<Binary, Outcome> unsettledMap)
- {
- super(session, name, unsettledMap);
- setDeliveryCount(UnsignedInteger.valueOf(0));
- setLinkEventListener(ReceivingLinkListener.DEFAULT);
- }
-
- public ReceivingLinkEndpoint(final SessionEndpoint session, final Attach attach)
- {
- super(session, attach);
- setDeliveryCount(attach.getInitialDeliveryCount());
- setLinkEventListener(ReceivingLinkListener.DEFAULT);
- setSendingSettlementMode(attach.getSndSettleMode());
- setReceivingSettlementMode(attach.getRcvSettleMode());
- }
-
-
- @Override public Role getRole()
- {
- return Role.RECEIVER;
- }
-
- @Override
- public void receiveTransfer(final Transfer transfer, final Delivery delivery)
- {
- synchronized (getLock())
- {
- TransientState transientState;
- final Binary deliveryTag = delivery.getDeliveryTag();
- boolean existingState = _unsettledMap.containsKey(deliveryTag);
- if(!existingState || transfer.getState() != null)
- {
- _unsettledMap.put(deliveryTag, transfer.getState());
- }
- if(!existingState)
- {
- transientState = new TransientState(transfer.getDeliveryId());
- if(delivery.isSettled())
- {
- transientState.setSettled(true);
- }
- _unsettledIds.put(deliveryTag, transientState);
- setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
- setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
-
- }
- else
- {
- transientState = _unsettledIds.get(deliveryTag);
- transientState.incrementCredit();
- if(delivery.isSettled())
- {
- transientState.setSettled(true);
- }
- }
-
- if(transientState.isSettled() && delivery.isComplete())
- {
- _unsettledMap.remove(deliveryTag);
- }
- getLinkEventListener().messageTransfer(transfer);
-
-
- getLock().notifyAll();
- }
- }
-
- @Override public void receiveFlow(final Flow flow)
- {
- synchronized (getLock())
- {
- super.receiveFlow(flow);
- _remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain());
- setAvailable(flow.getAvailable());
- setDeliveryCount(flow.getDeliveryCount());
- getLock().notifyAll();
- }
- }
-
-
- public boolean isDrained()
- {
- return getDrain() && getDeliveryCount().equals(getDrainLimit());
- }
-
- @Override
- public void settledByPeer(final Binary deliveryTag)
- {
- synchronized (getLock())
- {
- // TODO XXX : need to do anything about the window here?
- if(settled(deliveryTag) && _creditWindow)
- {
- sendFlowConditional();
- }
- }
- }
-
- public boolean settled(final Binary deliveryTag)
- {
- synchronized(getLock())
- {
- boolean deleted;
- if(deleted = (_unsettledIds.remove(deliveryTag) != null))
- {
- _unsettledMap.remove(deliveryTag);
-
- getLock().notifyAll();
- }
-
- return deleted;
- }
- }
-
- public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
- {
- synchronized(getLock())
- {
- if(_unsettledMap.containsKey(deliveryTag))
- {
- boolean outcomeUpdate = false;
- Outcome outcome=null;
- if(state instanceof Outcome)
- {
- outcome = (Outcome)state;
- }
- else if(state instanceof TransactionalState)
- {
- // TODO? Is this correct
- outcome = ((TransactionalState)state).getOutcome();
- }
-
- if(outcome != null)
- {
- Object oldOutcome = _unsettledMap.put(deliveryTag, outcome);
- outcomeUpdate = !outcome.equals(oldOutcome);
- }
-
-
-
-
- TransientState transientState = _unsettledIds.get(deliveryTag);
- if(outcomeUpdate || settled)
- {
-
- final UnsignedInteger transferId = transientState.getDeliveryId();
-
- getSession().updateDisposition(getRole(), transferId, transferId, state, settled);
- }
-
-
- if(settled)
- {
-
- if(settled(deliveryTag))
- {
- if(!isDetached() && _creditWindow)
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
- else
- {
- getSession().sendFlowConditional();
- }
- }
- }
- getLock().notifyAll();
- }
- else
- {
- TransientState transientState = _unsettledIds.get(deliveryTag);
- if(_creditWindow)
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
-
- }
- }
-
- }
-
-
- public void setCreditWindow()
- {
- setCreditWindow(true);
- }
- public void setCreditWindow(boolean window)
- {
-
- _creditWindow = window;
- sendFlowConditional();
-
- }
-
- public void drain()
- {
- synchronized (getLock())
- {
- setDrain(true);
- _creditWindow = false;
- _drainLimit = getDeliveryCount().add(getLinkCredit());
- sendFlow();
- getLock().notifyAll();
- }
- }
-
- @Override
- public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
- {
- super.receiveDeliveryState(unsettled, state, settled);
- if(_creditWindow)
- {
- if(Boolean.TRUE.equals(settled))
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
- }
- }
-
- public void requestTransactionalSend(Object txnId)
- {
- synchronized (getLock())
- {
- setDrain(true);
- _creditWindow = false;
- setTransactionId(txnId);
- sendFlow();
- getLock().notifyAll();
- }
- }
-
- private void sendFlow(final Object transactionId)
- {
- sendFlow();
- }
-
-
- public void clearDrain()
- {
- synchronized (getLock())
- {
- setDrain(false);
- sendFlow();
- getLock().notifyAll();
- }
- }
-
- public void updateAllDisposition(Binary deliveryTag, DeliveryState deliveryState, boolean settled)
- {
- synchronized(getLock())
- {
- if(!_unsettledIds.isEmpty())
- {
- Binary firstTag = _unsettledIds.keySet().iterator().next();
- Binary lastTag = deliveryTag;
- updateDispositions(firstTag, lastTag, deliveryState, settled);
- }
- }
- }
-
- private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled)
- {
- SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger,UnsignedInteger>();
-
- synchronized(getLock())
- {
-
- Iterator<Binary> iter = _unsettledIds.keySet().iterator();
- List<Binary> tagsToUpdate = new ArrayList<Binary>();
- Binary tag = null;
-
- while(iter.hasNext() && !(tag = iter.next()).equals(firstTag));
-
- if(firstTag.equals(tag))
- {
- tagsToUpdate.add(tag);
-
- UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
-
- UnsignedInteger first = deliveryId;
- UnsignedInteger last = first;
-
- while(iter.hasNext())
- {
- tag = iter.next();
- tagsToUpdate.add(tag);
-
- deliveryId = _unsettledIds.get(tag).getDeliveryId();
-
- if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
- {
- last = deliveryId;
- }
- else
- {
- ranges.put(first,last);
- first = last = deliveryId;
- }
-
- if(tag.equals(lastTag))
- {
- break;
- }
- }
-
- ranges.put(first,last);
- }
-
- if(settled)
- {
-
- for(Binary deliveryTag : tagsToUpdate)
- {
- if(settled(deliveryTag) && _creditWindow)
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1)));
- }
- }
- sendFlowConditional();
- }
-
-
-
- for(Map.Entry<UnsignedInteger,UnsignedInteger> range : ranges.entrySet())
- {
- getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled);
- }
-
-
- getLock().notifyAll();
- }
-
- }
-
- @Override
- public void settle(Binary deliveryTag)
- {
- super.settle(deliveryTag);
- if(_creditWindow)
- {
- sendFlowConditional();
- }
-
- }
-
- public void flowStateChanged()
- {
- }
-
- public UnsignedInteger getDrainLimit()
- {
- return _drainLimit;
- }
-
- UnsignedInteger getLastDeliveryId()
- {
- return _lastDeliveryId;
- }
-
- void setLastDeliveryId(UnsignedInteger lastDeliveryId)
- {
- _lastDeliveryId = lastDeliveryId;
- }
-
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.transport; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.*; + + +import java.util.*; + +public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener> +{ + + + private UnsignedInteger _lastDeliveryId; + + private static class TransientState + { + + UnsignedInteger _deliveryId; + int _credit = 1; + boolean _settled; + + private TransientState(final UnsignedInteger transferId) + { + _deliveryId = transferId; + } + + void incrementCredit() + { + _credit++; + } + + public int getCredit() + { + return _credit; + } + + public UnsignedInteger getDeliveryId() + { + return _deliveryId; + } + + public boolean isSettled() + { + return _settled; + } + + public void setSettled(boolean settled) + { + _settled = settled; + } + } + + private Map<Binary, Object> _unsettledMap = new LinkedHashMap<Binary, Object>(); + private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<Binary, TransientState>(); + private boolean _creditWindow; + private boolean _remoteDrain; + private UnsignedInteger _remoteTransferCount; + private UnsignedInteger _drainLimit; + + + public ReceivingLinkEndpoint(final SessionEndpoint session, String name) + { + this(session,name,null); + } + + public ReceivingLinkEndpoint(final SessionEndpoint session, String name, Map<Binary, Outcome> unsettledMap) + { + super(session, name, unsettledMap); + setDeliveryCount(UnsignedInteger.valueOf(0)); + setLinkEventListener(ReceivingLinkListener.DEFAULT); + } + + public ReceivingLinkEndpoint(final SessionEndpoint session, final Attach attach) + { + super(session, attach); + setDeliveryCount(attach.getInitialDeliveryCount()); + setLinkEventListener(ReceivingLinkListener.DEFAULT); + setSendingSettlementMode(attach.getSndSettleMode()); + setReceivingSettlementMode(attach.getRcvSettleMode()); + } + + + @Override public Role getRole() + { + return Role.RECEIVER; + } + + @Override + public void receiveTransfer(final Transfer transfer, final Delivery delivery) + { + synchronized (getLock()) + { + TransientState transientState; + final Binary deliveryTag = delivery.getDeliveryTag(); + boolean existingState = _unsettledMap.containsKey(deliveryTag); + if(!existingState || transfer.getState() != null) + { + _unsettledMap.put(deliveryTag, transfer.getState()); + } + if(!existingState) + { + transientState = new TransientState(transfer.getDeliveryId()); + if(delivery.isSettled()) + { + transientState.setSettled(true); + } + _unsettledIds.put(deliveryTag, transientState); + setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE)); + setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE)); + + } + else + { + transientState = _unsettledIds.get(deliveryTag); + transientState.incrementCredit(); + if(delivery.isSettled()) + { + transientState.setSettled(true); + } + } + + if(transientState.isSettled() && delivery.isComplete()) + { + _unsettledMap.remove(deliveryTag); + } + getLinkEventListener().messageTransfer(transfer); + + + getLock().notifyAll(); + } + } + + @Override public void receiveFlow(final Flow flow) + { + synchronized (getLock()) + { + super.receiveFlow(flow); + _remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain()); + setAvailable(flow.getAvailable()); + setDeliveryCount(flow.getDeliveryCount()); + getLock().notifyAll(); + } + } + + + public boolean isDrained() + { + return getDrain() && getDeliveryCount().equals(getDrainLimit()); + } + + @Override + public void settledByPeer(final Binary deliveryTag) + { + synchronized (getLock()) + { + // TODO XXX : need to do anything about the window here? + if(settled(deliveryTag) && _creditWindow) + { + sendFlowConditional(); + } + } + } + + public boolean settled(final Binary deliveryTag) + { + synchronized(getLock()) + { + boolean deleted; + if(deleted = (_unsettledIds.remove(deliveryTag) != null)) + { + _unsettledMap.remove(deliveryTag); + + getLock().notifyAll(); + } + + return deleted; + } + } + + public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled) + { + synchronized(getLock()) + { + if(_unsettledMap.containsKey(deliveryTag)) + { + boolean outcomeUpdate = false; + Outcome outcome=null; + if(state instanceof Outcome) + { + outcome = (Outcome)state; + } + else if(state instanceof TransactionalState) + { + // TODO? Is this correct + outcome = ((TransactionalState)state).getOutcome(); + } + + if(outcome != null) + { + Object oldOutcome = _unsettledMap.put(deliveryTag, outcome); + outcomeUpdate = !outcome.equals(oldOutcome); + } + + + + + TransientState transientState = _unsettledIds.get(deliveryTag); + if(outcomeUpdate || settled) + { + + final UnsignedInteger transferId = transientState.getDeliveryId(); + + getSession().updateDisposition(getRole(), transferId, transferId, state, settled); + } + + + if(settled) + { + + if(settled(deliveryTag)) + { + if(!isDetached() && _creditWindow) + { + setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); + sendFlowConditional(); + } + else + { + getSession().sendFlowConditional(); + } + } + } + getLock().notifyAll(); + } + else + { + TransientState transientState = _unsettledIds.get(deliveryTag); + if(_creditWindow) + { + setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); + sendFlowConditional(); + } + + } + } + + } + + + public void setCreditWindow() + { + setCreditWindow(true); + } + public void setCreditWindow(boolean window) + { + + _creditWindow = window; + sendFlowConditional(); + + } + + public void drain() + { + synchronized (getLock()) + { + setDrain(true); + _creditWindow = false; + _drainLimit = getDeliveryCount().add(getLinkCredit()); + sendFlow(); + getLock().notifyAll(); + } + } + + @Override + public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled) + { + super.receiveDeliveryState(unsettled, state, settled); + if(_creditWindow) + { + if(Boolean.TRUE.equals(settled)) + { + setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); + sendFlowConditional(); + } + } + } + + public void requestTransactionalSend(Object txnId) + { + synchronized (getLock()) + { + setDrain(true); + _creditWindow = false; + setTransactionId(txnId); + sendFlow(); + getLock().notifyAll(); + } + } + + private void sendFlow(final Object transactionId) + { + sendFlow(); + } + + + public void clearDrain() + { + synchronized (getLock()) + { + setDrain(false); + sendFlow(); + getLock().notifyAll(); + } + } + + public void updateAllDisposition(Binary deliveryTag, DeliveryState deliveryState, boolean settled) + { + synchronized(getLock()) + { + if(!_unsettledIds.isEmpty()) + { + Binary firstTag = _unsettledIds.keySet().iterator().next(); + Binary lastTag = deliveryTag; + updateDispositions(firstTag, lastTag, deliveryState, settled); + } + } + } + + private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled) + { + SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger,UnsignedInteger>(); + + synchronized(getLock()) + { + + Iterator<Binary> iter = _unsettledIds.keySet().iterator(); + List<Binary> tagsToUpdate = new ArrayList<Binary>(); + Binary tag = null; + + while(iter.hasNext() && !(tag = iter.next()).equals(firstTag)); + + if(firstTag.equals(tag)) + { + tagsToUpdate.add(tag); + + UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId(); + + UnsignedInteger first = deliveryId; + UnsignedInteger last = first; + + while(iter.hasNext()) + { + tag = iter.next(); + tagsToUpdate.add(tag); + + deliveryId = _unsettledIds.get(tag).getDeliveryId(); + + if(deliveryId.equals(last.add(UnsignedInteger.ONE))) + { + last = deliveryId; + } + else + { + ranges.put(first,last); + first = last = deliveryId; + } + + if(tag.equals(lastTag)) + { + break; + } + } + + ranges.put(first,last); + } + + if(settled) + { + + for(Binary deliveryTag : tagsToUpdate) + { + if(settled(deliveryTag) && _creditWindow) + { + setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1))); + } + } + sendFlowConditional(); + } + + + + for(Map.Entry<UnsignedInteger,UnsignedInteger> range : ranges.entrySet()) + { + getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled); + } + + + getLock().notifyAll(); + } + + } + + @Override + public void settle(Binary deliveryTag) + { + super.settle(deliveryTag); + if(_creditWindow) + { + sendFlowConditional(); + } + + } + + public void flowStateChanged() + { + } + + public UnsignedInteger getDrainLimit() + { + return _drainLimit; + } + + UnsignedInteger getLastDeliveryId() + { + return _lastDeliveryId; + } + + void setLastDeliveryId(UnsignedInteger lastDeliveryId) + { + _lastDeliveryId = lastDeliveryId; + } + + +} |