summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java428
1 files changed, 214 insertions, 214 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
index a53b3661be..16e198e957 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
@@ -1,214 +1,214 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.transport.Attach;
-import org.apache.qpid.amqp_1_0.type.transport.Flow;
-import org.apache.qpid.amqp_1_0.type.transport.Role;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
-{
-
- private UnsignedInteger _lastDeliveryId;
- private Binary _lastDeliveryTag;
- private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
- private Binary _transactionId;
-
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
- {
- this(sessionEndpoint, name, null);
- }
-
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
- {
- super(sessionEndpoint, name, unsettled);
- init();
- }
-
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
- {
- super(sessionEndpoint, attach);
- setSendingSettlementMode(attach.getSndSettleMode());
- setReceivingSettlementMode(attach.getRcvSettleMode());
- init();
- }
-
- private void init()
- {
- setDeliveryCount(UnsignedInteger.valueOf(0));
- setAvailable(UnsignedInteger.valueOf(0));
- setLinkEventListener(SendingLinkListener.DEFAULT);
- }
-
- @Override public Role getRole()
- {
- return Role.SENDER;
- }
-
- public boolean transfer(final Transfer xfr)
- {
- SessionEndpoint s = getSession();
- int transferCount;
- transferCount = _lastDeliveryTag == null ? 1 : 1;
- xfr.setMessageFormat(UnsignedInteger.ZERO);
- synchronized(getLock())
- {
-
- final int currentCredit = getLinkCredit().intValue() - transferCount;
-
- if(currentCredit < 0)
- {
- return false;
- }
- else
- {
- setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
- }
-
- setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
-
- xfr.setHandle(getLocalHandle());
-
- s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
-
- if(!Boolean.TRUE.equals(xfr.getSettled()))
- {
- _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
- }
- }
-
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- _lastDeliveryTag = xfr.getDeliveryTag();
- }
- else
- {
- _lastDeliveryTag = null;
- }
-
- return true;
- }
-
-
- public void drained()
- {
- synchronized (getLock())
- {
- setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
- setLinkCredit(UnsignedInteger.ZERO);
- sendFlow();
- }
- }
-
- @Override
- public void receiveFlow(final Flow flow)
- {
- super.receiveFlow(flow);
- UnsignedInteger t = flow.getDeliveryCount();
- UnsignedInteger c = flow.getLinkCredit();
- setDrain(flow.getDrain());
-
- Map options;
- if((options = flow.getProperties()) != null)
- {
- _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
- }
-
- if(t == null)
- {
- setLinkCredit(c);
- }
- else
- {
- UnsignedInteger limit = t.add(c);
- if(limit.compareTo(getDeliveryCount())<=0)
- {
- setLinkCredit(UnsignedInteger.valueOf(0));
- }
- else
- {
- setLinkCredit(limit.subtract(getDeliveryCount()));
- }
- }
- getLinkEventListener().flowStateChanged();
-
- }
-
- @Override
- public void flowStateChanged()
- {
- getLinkEventListener().flowStateChanged();
- }
-
- public boolean hasCreditToSend()
- {
- UnsignedInteger linkCredit = getLinkCredit();
- return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
- && getSession().hasCreditToSend();
- }
-
- public void receiveDeliveryState(final Delivery unsettled,
- final DeliveryState state,
- final Boolean settled)
- {
- super.receiveDeliveryState(unsettled, state, settled);
- if(settled)
- {
- _unsettledMap.remove(unsettled.getDeliveryTag());
- }
- }
-
- public UnsignedInteger getLastDeliveryId()
- {
- return _lastDeliveryId;
- }
-
- public void setLastDeliveryId(final UnsignedInteger deliveryId)
- {
- _lastDeliveryId = deliveryId;
- }
-
- public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
- {
- synchronized(getLock())
- {
- UnsignedInteger deliveryId;
- if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
- {
- settle(deliveryTag);
- getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
- }
-
- }
- }
-
- public Binary getTransactionId()
- {
- return _transactionId;
- }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
+{
+
+ private UnsignedInteger _lastDeliveryId;
+ private Binary _lastDeliveryTag;
+ private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
+ private Binary _transactionId;
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
+ {
+ this(sessionEndpoint, name, null);
+ }
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+ {
+ super(sessionEndpoint, name, unsettled);
+ init();
+ }
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
+ {
+ super(sessionEndpoint, attach);
+ setSendingSettlementMode(attach.getSndSettleMode());
+ setReceivingSettlementMode(attach.getRcvSettleMode());
+ init();
+ }
+
+ private void init()
+ {
+ setDeliveryCount(UnsignedInteger.valueOf(0));
+ setAvailable(UnsignedInteger.valueOf(0));
+ setLinkEventListener(SendingLinkListener.DEFAULT);
+ }
+
+ @Override public Role getRole()
+ {
+ return Role.SENDER;
+ }
+
+ public boolean transfer(final Transfer xfr)
+ {
+ SessionEndpoint s = getSession();
+ int transferCount;
+ transferCount = _lastDeliveryTag == null ? 1 : 1;
+ xfr.setMessageFormat(UnsignedInteger.ZERO);
+ synchronized(getLock())
+ {
+
+ final int currentCredit = getLinkCredit().intValue() - transferCount;
+
+ if(currentCredit < 0)
+ {
+ return false;
+ }
+ else
+ {
+ setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+ }
+
+ setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
+
+ xfr.setHandle(getLocalHandle());
+
+ s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
+
+ if(!Boolean.TRUE.equals(xfr.getSettled()))
+ {
+ _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
+ }
+ }
+
+ if(Boolean.TRUE.equals(xfr.getMore()))
+ {
+ _lastDeliveryTag = xfr.getDeliveryTag();
+ }
+ else
+ {
+ _lastDeliveryTag = null;
+ }
+
+ return true;
+ }
+
+
+ public void drained()
+ {
+ synchronized (getLock())
+ {
+ setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+ setLinkCredit(UnsignedInteger.ZERO);
+ sendFlow();
+ }
+ }
+
+ @Override
+ public void receiveFlow(final Flow flow)
+ {
+ super.receiveFlow(flow);
+ UnsignedInteger t = flow.getDeliveryCount();
+ UnsignedInteger c = flow.getLinkCredit();
+ setDrain(flow.getDrain());
+
+ Map options;
+ if((options = flow.getProperties()) != null)
+ {
+ _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
+ }
+
+ if(t == null)
+ {
+ setLinkCredit(c);
+ }
+ else
+ {
+ UnsignedInteger limit = t.add(c);
+ if(limit.compareTo(getDeliveryCount())<=0)
+ {
+ setLinkCredit(UnsignedInteger.valueOf(0));
+ }
+ else
+ {
+ setLinkCredit(limit.subtract(getDeliveryCount()));
+ }
+ }
+ getLinkEventListener().flowStateChanged();
+
+ }
+
+ @Override
+ public void flowStateChanged()
+ {
+ getLinkEventListener().flowStateChanged();
+ }
+
+ public boolean hasCreditToSend()
+ {
+ UnsignedInteger linkCredit = getLinkCredit();
+ return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
+ && getSession().hasCreditToSend();
+ }
+
+ public void receiveDeliveryState(final Delivery unsettled,
+ final DeliveryState state,
+ final Boolean settled)
+ {
+ super.receiveDeliveryState(unsettled, state, settled);
+ if(settled)
+ {
+ _unsettledMap.remove(unsettled.getDeliveryTag());
+ }
+ }
+
+ public UnsignedInteger getLastDeliveryId()
+ {
+ return _lastDeliveryId;
+ }
+
+ public void setLastDeliveryId(final UnsignedInteger deliveryId)
+ {
+ _lastDeliveryId = deliveryId;
+ }
+
+ public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+ {
+ synchronized(getLock())
+ {
+ UnsignedInteger deliveryId;
+ if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
+ {
+ settle(deliveryTag);
+ getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+ }
+
+ }
+ }
+
+ public Binary getTransactionId()
+ {
+ return _transactionId;
+ }
+
+}