summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java1603
1 files changed, 806 insertions, 797 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 17f90fef59..0ab2e1a9b7 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -1,797 +1,806 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.transport;
-
-import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.amqp_1_0.type.transaction.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class SessionEndpoint
-{
- private SessionState _state = SessionState.INACTIVE;
-
- private final Map<String, LinkEndpoint> _linkMap = new HashMap<String, LinkEndpoint>();
- private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<LinkEndpoint, UnsignedInteger>();
- private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<UnsignedInteger, LinkEndpoint>();
-
- private long _timeout;
-
-
- private ConnectionEndpoint _connection;
- private long _lastAttachedTime;
-
- private short _receivingChannel;
- private short _sendingChannel;
-
- private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled;
- private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled;
-
- // has to be a power of two
- private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
- private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
-
- private SequenceNumber _nextIncomingTransferId;
- private SequenceNumber _nextOutgoingTransferId;
-
- private int _nextOutgoingDeliveryId;
-
- //private SequenceNumber _incomingLWM;
- //private SequenceNumber _outgoingLWM;
-
- private UnsignedInteger _outgoingSessionCredit;
-
-
-
- private UnsignedInteger _initialOutgoingId;
-
- private SessionEventListener _sessionEventListener = SessionEventListener.DEFAULT;
-
- private int _availableIncomingCredit;
- private int _availableOutgoingCredit;
- private UnsignedInteger _lastSentIncomingLimit;
-
- public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
- {
- this(connectionEndpoint, UnsignedInteger.valueOf(0));
- }
-
- public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, Begin begin)
- {
- this(connectionEndpoint, UnsignedInteger.valueOf(0));
- _state = SessionState.BEGIN_RECVD;
- _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
- }
-
-
- public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, UnsignedInteger nextOutgoingId)
- {
- _connection = connectionEndpoint;
-
- _initialOutgoingId = nextOutgoingId;
- _nextOutgoingTransferId = new SequenceNumber(nextOutgoingId.intValue());
-
- _outgoingUnsettled = new LinkedHashMap<UnsignedInteger,Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
- _incomingUnsettled = new LinkedHashMap<UnsignedInteger, Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
- _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
- _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
- }
-
-
- public void setReceivingChannel(final short receivingChannel)
- {
- _receivingChannel = receivingChannel;
- switch(_state)
- {
- case INACTIVE:
- _state = SessionState.BEGIN_RECVD;
- break;
- case BEGIN_SENT:
- _state = SessionState.ACTIVE;
- break;
- default:
- // TODO error
-
- }
- }
-
-
- public void setSendingChannel(final short sendingChannel)
- {
- _sendingChannel = sendingChannel;
- switch(_state)
- {
- case INACTIVE:
- _state = SessionState.BEGIN_SENT;
- break;
- case BEGIN_RECVD:
- _state = SessionState.ACTIVE;
- break;
- default:
- // TODO error
-
- }
- }
-
- public SessionState getState()
- {
- return _state;
- }
-
- public void end()
- {
- end(null);
- }
-
- public void end(final End end)
- {
- synchronized(getLock())
- {
- switch(_state)
- {
- case END_SENT:
- _state = SessionState.ENDED;
- break;
- case ACTIVE:
- detachLinks();
- _sessionEventListener.remoteEnd(end);
- short sendChannel = getSendingChannel();
- _connection.sendEnd(sendChannel, new End());
- _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
- break;
- default:
- sendChannel = getSendingChannel();
- End reply = new End();
- Error error = new Error();
- error.setCondition(AmqpError.ILLEGAL_STATE);
- error.setDescription("END called on Session which has not been opened");
- reply.setError(error);
- _connection.sendEnd(sendChannel, reply);
- break;
-
-
- }
- getLock().notifyAll();
- }
- }
-
- private void detachLinks()
- {
- Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
- for(UnsignedInteger handle : handles)
- {
- detach(handle, null);
- }
- }
-
- public short getSendingChannel()
- {
- return _sendingChannel;
- }
-
-
- public void receiveAttach(final Attach attach)
- {
- if(_state == SessionState.ACTIVE)
- {
- UnsignedInteger handle = attach.getHandle();
- if(_remoteLinkEndpoints.containsKey(handle))
- {
- // TODO - Error - handle busy?
- }
- else
- {
- LinkEndpoint endpoint = getLinkMap().get(attach.getName());
- if(endpoint == null)
- {
- endpoint = attach.getRole() == Role.RECEIVER
- ? new SendingLinkEndpoint(this, attach)
- : new ReceivingLinkEndpoint(this, attach);
-
- // TODO : fix below - distinguish between local and remote owned
- endpoint.setSource(attach.getSource());
- endpoint.setTarget(attach.getTarget());
-
-
- }
-
- if(attach.getRole() == Role.SENDER)
- {
- endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
- }
-
- _remoteLinkEndpoints.put(handle, endpoint);
-
- if(!_localLinkEndpoints.containsKey(endpoint))
- {
- UnsignedInteger localHandle = findNextAvailableHandle();
- endpoint.setLocalHandle(localHandle);
- _localLinkEndpoints.put(endpoint, localHandle);
-
- _sessionEventListener.remoteLinkCreation(endpoint);
-
-
- }
- else
- {
- endpoint.receiveAttach(attach);
- }
- }
- }
- }
-
- private void send(final FrameBody frameBody)
- {
- _connection.send(this.getSendingChannel(), frameBody);
- }
-
-
- private int send(final FrameBody frameBody, ByteBuffer payload)
- {
- return _connection.send(this.getSendingChannel(), frameBody, payload);
- }
-
- private UnsignedInteger findNextAvailableHandle()
- {
- int i = 0;
- do
- {
- if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
- {
- return UnsignedInteger.valueOf(i);
- }
- } while(++i != 0);
-
- // TODO
- throw new RuntimeException();
- }
-
- public void receiveDetach(final Detach detach)
- {
- UnsignedInteger handle = detach.getHandle();
- detach(handle, detach);
- }
-
- private void detach(UnsignedInteger handle, Detach detach)
- {
- if(_remoteLinkEndpoints.containsKey(handle))
- {
- LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
-
- endpoint.remoteDetached(detach);
-
- _localLinkEndpoints.remove(endpoint);
-
-
- }
- else
- {
- // TODO
- }
- }
-
- public void receiveTransfer(final Transfer transfer)
- {
- synchronized(getLock())
- {
- _nextIncomingTransferId.incr();
-/*
- _availableIncomingCredit--;
-*/
-
- UnsignedInteger handle = transfer.getHandle();
-
-
-
- LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
-
- if(endpoint == null)
- {
- //TODO - error unknown link
- System.err.println("Unknown endpoint " + transfer);
-
- }
-
- UnsignedInteger deliveryId = transfer.getDeliveryId();
- if(deliveryId == null)
- {
- deliveryId = ((ReceivingLinkEndpoint)endpoint).getLastDeliveryId();
- }
-
- Delivery delivery = _incomingUnsettled.get(deliveryId);
- if(delivery == null)
- {
- delivery = new Delivery(transfer, endpoint);
- _incomingUnsettled.put(deliveryId,delivery);
- if(delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
- {
-/*
- _availableIncomingCredit++;
-*/
- }
-
- if(Boolean.TRUE.equals(transfer.getMore()))
- {
- ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(transfer.getDeliveryId());
- }
- }
- else
- {
- if(delivery.getDeliveryId().equals(deliveryId))
- {
- delivery.addTransfer(transfer);
- if(delivery.isSettled())
- {
-/*
- _availableIncomingCredit++;
-*/
- }
- else if(Boolean.TRUE.equals(transfer.getAborted()))
- {
-/*
- _availableIncomingCredit += delivery.getTransfers().size();
-*/
- }
-
- if(!Boolean.TRUE.equals(transfer.getMore()))
- {
- ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(null);
- }
- }
- else
- {
- // TODO - error
- System.err.println("Incorrect transfer id " + transfer);
- }
- }
-
- if(endpoint != null)
- {
- endpoint.receiveTransfer(transfer, delivery);
- }
-
- if((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
- {
- _incomingUnsettled.remove(deliveryId);
- }
- }
- }
-
- public void receiveFlow(final Flow flow)
- {
-
- synchronized(getLock())
- {
- UnsignedInteger handle = flow.getHandle();
- LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
-
- final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
- int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
- _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
-
- if(endpoint != null)
- {
- endpoint.receiveFlow( flow );
- }
- else
- {
- for(LinkEndpoint le : _remoteLinkEndpoints.values())
- {
- le.flowStateChanged();
- }
- }
-
- getLock().notifyAll();
- }
-
-
- }
-
- public void receiveDisposition(final Disposition disposition)
- {
- Role dispositionRole = disposition.getRole();
-
- LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
-
- if(dispositionRole == Role.RECEIVER)
- {
- unsettledTransfers = _outgoingUnsettled;
- }
- else
- {
- unsettledTransfers = _incomingUnsettled;
-
- }
-
- UnsignedInteger deliveryId = disposition.getFirst();
- UnsignedInteger last = disposition.getLast();
- if(last == null)
- {
- last = deliveryId;
- }
-
-
- while(deliveryId.compareTo(last)<=0)
- {
-
- Delivery delivery = unsettledTransfers.get(deliveryId);
- if(delivery != null)
- {
- delivery.getLinkEndpoint().receiveDeliveryState(delivery,
- disposition.getState(),
- disposition.getSettled());
- }
- deliveryId = deliveryId.add(UnsignedInteger.ONE);
- }
- if(disposition.getSettled())
- {
- checkSendFlow();
- }
-
- }
-
- private void checkSendFlow()
- {
- //TODO
- }
-
- public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr)
- {
- return createSendingLinkEndpoint(name, targetAddr, sourceAddr, null);
- }
-
- public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr, Map<Binary, Outcome> unsettled)
- {
- return createSendingLinkEndpoint(name, targetAddr, sourceAddr, false, unsettled);
- }
-
- public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr,
- final String sourceAddr, boolean durable,
- Map<Binary, Outcome> unsettled)
- {
-
- Source source = new Source();
- source.setAddress(sourceAddr);
- Target target = new Target();
- target.setAddress(targetAddr);
- if(durable)
- {
- target.setDurable(TerminusDurability.UNSETTLED_STATE);
- target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
-
- return createSendingLinkEndpoint(name, source, target, unsettled);
-
- }
-
- public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target)
- {
- return createSendingLinkEndpoint(name, source, target, null);
- }
-
- public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled)
- {
- SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled);
- endpoint.setSource(source);
- endpoint.setTarget(target);
- UnsignedInteger handle = findNextAvailableHandle();
- _localLinkEndpoints.put(endpoint, handle);
- endpoint.setLocalHandle(handle);
- getLinkMap().put(name, endpoint);
-
- return endpoint;
- }
-
- public void sendAttach(Attach attach)
- {
- send(attach);
- }
-
- public void sendTransfer(final Transfer xfr, SendingLinkEndpoint endpoint, boolean newDelivery)
- {
- _nextOutgoingTransferId.incr();
- UnsignedInteger deliveryId;
- if(newDelivery)
- {
- deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
- endpoint.setLastDeliveryId(deliveryId);
- }
- else
- {
- deliveryId = endpoint.getLastDeliveryId();
- }
- xfr.setDeliveryId(deliveryId);
-
- if(!Boolean.TRUE.equals(xfr.getSettled()))
- {
- Delivery delivery;
- if((delivery = _outgoingUnsettled.get(deliveryId))== null)
- {
- delivery = new Delivery(xfr, endpoint);
- _outgoingUnsettled.put(deliveryId, delivery);
-
- }
- else
- {
- delivery.addTransfer(xfr);
- }
- _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
- endpoint.addUnsettled(delivery);
-
- }
-
- try
- {
- ByteBuffer payload = xfr.getPayload();
- int payloadSent = send(xfr, payload);
-
- if(payload != null && payloadSent < payload.remaining())
- {
- payload = payload.duplicate();
-try
-{
- payload.position(payload.position()+payloadSent);
-}
-catch(IllegalArgumentException e)
-{
- System.err.println("UNEXPECTED");
- System.err.println("Payload Position: " + payload.position());
- System.err.println("Payload Sent: " + payloadSent);
- System.err.println("Payload Remaining: " + payload.remaining());
- throw e;
-
-}
-
- Transfer secondTransfer = new Transfer();
-
- secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
- secondTransfer.setHandle(xfr.getHandle());
- secondTransfer.setSettled(xfr.getSettled());
- secondTransfer.setState(xfr.getState());
- secondTransfer.setMessageFormat(xfr.getMessageFormat());
- secondTransfer.setPayload(payload);
-
- sendTransfer(secondTransfer, endpoint, false);
-
- }
- }
- catch(OversizeFrameException e)
- {
- e.printStackTrace();
- }
-
- }
-
- public Object getLock()
- {
- return _connection.getLock();
- }
-
- public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
- String targetAddr,
- String sourceAddr,
- UnsignedInteger initialCredit,
- final DistributionMode distributionMode)
- {
- Source source = new Source();
- source.setAddress(sourceAddr);
- source.setDistributionMode(distributionMode);
- Target target = new Target();
- target.setAddress(targetAddr);
-
- return createReceivingLinkEndpoint(name, target, source, initialCredit);
- }
-
- public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
- Target target,
- Source source,
- UnsignedInteger initialCredit)
- {
- ReceivingLinkEndpoint endpoint = new ReceivingLinkEndpoint(this, name);
- endpoint.setLinkCredit(initialCredit);
- endpoint.setSource(source);
- endpoint.setTarget(target);
- UnsignedInteger handle = findNextAvailableHandle();
- _localLinkEndpoints.put(endpoint, handle);
- endpoint.setLocalHandle(handle);
- getLinkMap().put(name, endpoint);
-
- return endpoint;
-
- }
-
- public void updateDisposition(final Role role,
- final UnsignedInteger first,
- final UnsignedInteger last,
- final DeliveryState state,
- final boolean settled)
- {
-
-
- Disposition disposition = new Disposition();
- disposition.setRole(role);
- disposition.setFirst(first);
- disposition.setLast(last);
- disposition.setSettled(settled);
-
- disposition.setState(state);
-
-
- if(settled)
- {
- if(role == Role.RECEIVER)
- {
- SequenceNumber pos = new SequenceNumber(first.intValue());
- SequenceNumber end = new SequenceNumber(last.intValue());
- while(pos.compareTo(end)<=0)
- {
- Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
-
-/*
- _availableIncomingCredit += d.getTransfers().size();
-*/
-
- pos.incr();
- }
- }
- }
-
- send(disposition);
- checkSendFlow();
- }
-
- public void settle(Role role, final UnsignedInteger deliveryId)
- {
- if(role == Role.RECEIVER)
- {
- Delivery d = _incomingUnsettled.remove(deliveryId);
- if(d != null)
- {
-/*
- _availableIncomingCredit += d.getTransfers().size();
-*/
- }
- }
- else
- {
- Delivery d = _outgoingUnsettled.remove(deliveryId);
-/* if(d != null)
- {
- _availableOutgoingCredit += d.getTransfers().size();
-
- }*/
- }
-
- }
-
- public void sendFlow()
- {
- sendFlow(new Flow());
- }
- public void sendFlow(final Flow flow)
- {
- final int nextIncomingId = _nextIncomingTransferId.intValue();
- flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
- flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
- _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
-
- flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
- flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
- send(flow);
- }
-
- public void sendFlowConditional()
- {
- UnsignedInteger clientsCredit = _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
- int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
- if(i >=0)
- {
- sendFlow();
- }
-
- }
-
- public void sendDetach(Detach detach)
- {
- send(detach);
-
- }
-
- void doEnd(End end)
- {
- }
-
- public void setNextIncomingId(final UnsignedInteger nextIncomingId)
- {
- _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
-
- }
-
- public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
- {
- _outgoingSessionCredit = outgoingSessionCredit;
- }
-
- public UnsignedInteger getNextOutgoingId()
- {
- return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
- }
-
- public UnsignedInteger getOutgoingWindowSize()
- {
- return UnsignedInteger.valueOf(_availableOutgoingCredit);
- }
-
- public boolean hasCreditToSend()
- {
- boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
- boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
- return b && b1;
- }
-
- public UnsignedInteger getIncomingWindowSize()
- {
- return UnsignedInteger.valueOf(_availableIncomingCredit);
- }
-
- public SessionEventListener getSessionEventListener()
- {
- return _sessionEventListener;
- }
-
- public void setSessionEventListener(final SessionEventListener sessionEventListener)
- {
- _sessionEventListener = sessionEventListener;
- }
-
- public ConnectionEndpoint getConnection()
- {
- return _connection;
- }
-
- public SendingLinkEndpoint createTransactionController(String name, TxnCapability... capabilities)
- {
- Coordinator coordinator = new Coordinator();
- coordinator.setCapabilities(capabilities);
-
- Source src = new Source();
-
- return createSendingLinkEndpoint(name, src, coordinator);
- }
-
- Map<String, LinkEndpoint> getLinkMap()
- {
- return _linkMap;
- }
-
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.amqp_1_0.type.transaction.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class SessionEndpoint
+{
+ private SessionState _state = SessionState.INACTIVE;
+
+ private final Map<String, LinkEndpoint> _linkMap = new HashMap<String, LinkEndpoint>();
+ private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<LinkEndpoint, UnsignedInteger>();
+ private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<UnsignedInteger, LinkEndpoint>();
+
+ private long _timeout;
+
+
+ private ConnectionEndpoint _connection;
+ private long _lastAttachedTime;
+
+ private short _receivingChannel;
+ private short _sendingChannel;
+
+ private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled;
+ private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled;
+
+ // has to be a power of two
+ private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
+ private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
+
+ private SequenceNumber _nextIncomingTransferId;
+ private SequenceNumber _nextOutgoingTransferId;
+
+ private int _nextOutgoingDeliveryId;
+
+ //private SequenceNumber _incomingLWM;
+ //private SequenceNumber _outgoingLWM;
+
+ private UnsignedInteger _outgoingSessionCredit;
+
+
+
+ private UnsignedInteger _initialOutgoingId;
+
+ private SessionEventListener _sessionEventListener = SessionEventListener.DEFAULT;
+
+ private int _availableIncomingCredit;
+ private int _availableOutgoingCredit;
+ private UnsignedInteger _lastSentIncomingLimit;
+
+ public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
+ {
+ this(connectionEndpoint, UnsignedInteger.valueOf(0));
+ }
+
+ public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, Begin begin)
+ {
+ this(connectionEndpoint, UnsignedInteger.valueOf(0));
+ _state = SessionState.BEGIN_RECVD;
+ _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
+ }
+
+
+ public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, UnsignedInteger nextOutgoingId)
+ {
+ _connection = connectionEndpoint;
+
+ _initialOutgoingId = nextOutgoingId;
+ _nextOutgoingTransferId = new SequenceNumber(nextOutgoingId.intValue());
+
+ _outgoingUnsettled = new LinkedHashMap<UnsignedInteger,Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
+ _incomingUnsettled = new LinkedHashMap<UnsignedInteger, Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
+ _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+ _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+ }
+
+
+ public void setReceivingChannel(final short receivingChannel)
+ {
+ _receivingChannel = receivingChannel;
+ switch(_state)
+ {
+ case INACTIVE:
+ _state = SessionState.BEGIN_RECVD;
+ break;
+ case BEGIN_SENT:
+ _state = SessionState.ACTIVE;
+ break;
+ default:
+ // TODO error
+
+ }
+ }
+
+
+ public void setSendingChannel(final short sendingChannel)
+ {
+ _sendingChannel = sendingChannel;
+ switch(_state)
+ {
+ case INACTIVE:
+ _state = SessionState.BEGIN_SENT;
+ break;
+ case BEGIN_RECVD:
+ _state = SessionState.ACTIVE;
+ break;
+ default:
+ // TODO error
+
+ }
+ }
+
+ public SessionState getState()
+ {
+ return _state;
+ }
+
+ public void end()
+ {
+ end(null);
+ }
+
+ public void end(final End end)
+ {
+ synchronized(getLock())
+ {
+ switch(_state)
+ {
+ case END_SENT:
+ _state = SessionState.ENDED;
+ break;
+ case ACTIVE:
+ detachLinks();
+ _sessionEventListener.remoteEnd(end);
+ short sendChannel = getSendingChannel();
+ _connection.sendEnd(sendChannel, new End());
+ _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
+ break;
+ default:
+ sendChannel = getSendingChannel();
+ End reply = new End();
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("END called on Session which has not been opened");
+ reply.setError(error);
+ _connection.sendEnd(sendChannel, reply);
+ break;
+
+
+ }
+ getLock().notifyAll();
+ }
+ }
+
+ private void detachLinks()
+ {
+ Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
+ for(UnsignedInteger handle : handles)
+ {
+ detach(handle, null);
+ }
+ }
+
+ public short getSendingChannel()
+ {
+ return _sendingChannel;
+ }
+
+
+ public void receiveAttach(final Attach attach)
+ {
+ if(_state == SessionState.ACTIVE)
+ {
+ UnsignedInteger handle = attach.getHandle();
+ if(_remoteLinkEndpoints.containsKey(handle))
+ {
+ // TODO - Error - handle busy?
+ }
+ else
+ {
+ LinkEndpoint endpoint = getLinkMap().get(attach.getName());
+ if(endpoint == null)
+ {
+ endpoint = attach.getRole() == Role.RECEIVER
+ ? new SendingLinkEndpoint(this, attach)
+ : new ReceivingLinkEndpoint(this, attach);
+
+ // TODO : fix below - distinguish between local and remote owned
+ endpoint.setSource(attach.getSource());
+ endpoint.setTarget(attach.getTarget());
+
+
+ }
+
+ if(attach.getRole() == Role.SENDER)
+ {
+ endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
+ }
+
+ _remoteLinkEndpoints.put(handle, endpoint);
+
+ if(!_localLinkEndpoints.containsKey(endpoint))
+ {
+ UnsignedInteger localHandle = findNextAvailableHandle();
+ endpoint.setLocalHandle(localHandle);
+ _localLinkEndpoints.put(endpoint, localHandle);
+
+ _sessionEventListener.remoteLinkCreation(endpoint);
+
+
+ }
+ else
+ {
+ endpoint.receiveAttach(attach);
+ }
+ }
+ }
+ }
+
+ private void send(final FrameBody frameBody)
+ {
+ _connection.send(this.getSendingChannel(), frameBody);
+ }
+
+
+ private int send(final FrameBody frameBody, ByteBuffer payload)
+ {
+ return _connection.send(this.getSendingChannel(), frameBody, payload);
+ }
+
+ private UnsignedInteger findNextAvailableHandle()
+ {
+ int i = 0;
+ do
+ {
+ if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
+ {
+ return UnsignedInteger.valueOf(i);
+ }
+ } while(++i != 0);
+
+ // TODO
+ throw new RuntimeException();
+ }
+
+ public void receiveDetach(final Detach detach)
+ {
+ UnsignedInteger handle = detach.getHandle();
+ detach(handle, detach);
+ }
+
+ private void detach(UnsignedInteger handle, Detach detach)
+ {
+ if(_remoteLinkEndpoints.containsKey(handle))
+ {
+ LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
+
+ endpoint.remoteDetached(detach);
+
+ _localLinkEndpoints.remove(endpoint);
+
+
+ }
+ else
+ {
+ // TODO
+ }
+ }
+
+ public void receiveTransfer(final Transfer transfer)
+ {
+ synchronized(getLock())
+ {
+ _nextIncomingTransferId.incr();
+/*
+ _availableIncomingCredit--;
+*/
+
+ UnsignedInteger handle = transfer.getHandle();
+
+
+
+ LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
+
+ if(endpoint == null)
+ {
+ //TODO - error unknown link
+ System.err.println("Unknown endpoint " + transfer);
+
+ }
+
+ UnsignedInteger deliveryId = transfer.getDeliveryId();
+ if(deliveryId == null)
+ {
+ deliveryId = ((ReceivingLinkEndpoint)endpoint).getLastDeliveryId();
+ }
+
+ Delivery delivery = _incomingUnsettled.get(deliveryId);
+ if(delivery == null)
+ {
+ delivery = new Delivery(transfer, endpoint);
+ _incomingUnsettled.put(deliveryId,delivery);
+ if(delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
+ {
+/*
+ _availableIncomingCredit++;
+*/
+ }
+
+ if(Boolean.TRUE.equals(transfer.getMore()))
+ {
+ ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(transfer.getDeliveryId());
+ }
+ }
+ else
+ {
+ if(delivery.getDeliveryId().equals(deliveryId))
+ {
+ delivery.addTransfer(transfer);
+ if(delivery.isSettled())
+ {
+/*
+ _availableIncomingCredit++;
+*/
+ }
+ else if(Boolean.TRUE.equals(transfer.getAborted()))
+ {
+/*
+ _availableIncomingCredit += delivery.getTransfers().size();
+*/
+ }
+
+ if(!Boolean.TRUE.equals(transfer.getMore()))
+ {
+ ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(null);
+ }
+ }
+ else
+ {
+ // TODO - error
+ System.err.println("Incorrect transfer id " + transfer);
+ }
+ }
+
+ if(endpoint != null)
+ {
+ endpoint.receiveTransfer(transfer, delivery);
+ }
+
+ if((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
+ {
+ _incomingUnsettled.remove(deliveryId);
+ }
+ }
+ }
+
+ public void receiveFlow(final Flow flow)
+ {
+
+ synchronized(getLock())
+ {
+ UnsignedInteger handle = flow.getHandle();
+ LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+
+ final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+ int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+ _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+
+ if(endpoint != null)
+ {
+ endpoint.receiveFlow( flow );
+ }
+ else
+ {
+ for(LinkEndpoint le : _remoteLinkEndpoints.values())
+ {
+ le.flowStateChanged();
+ }
+ }
+
+ getLock().notifyAll();
+ }
+
+
+ }
+
+ public void receiveDisposition(final Disposition disposition)
+ {
+ Role dispositionRole = disposition.getRole();
+
+ LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+
+ if(dispositionRole == Role.RECEIVER)
+ {
+ unsettledTransfers = _outgoingUnsettled;
+ }
+ else
+ {
+ unsettledTransfers = _incomingUnsettled;
+
+ }
+
+ UnsignedInteger deliveryId = disposition.getFirst();
+ UnsignedInteger last = disposition.getLast();
+ if(last == null)
+ {
+ last = deliveryId;
+ }
+
+
+ while(deliveryId.compareTo(last)<=0)
+ {
+
+ Delivery delivery = unsettledTransfers.get(deliveryId);
+ if(delivery != null)
+ {
+ delivery.getLinkEndpoint().receiveDeliveryState(delivery,
+ disposition.getState(),
+ disposition.getSettled());
+ }
+ deliveryId = deliveryId.add(UnsignedInteger.ONE);
+ }
+ if(disposition.getSettled())
+ {
+ checkSendFlow();
+ }
+
+ }
+
+ private void checkSendFlow()
+ {
+ //TODO
+ }
+
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr)
+ {
+ return createSendingLinkEndpoint(name, targetAddr, sourceAddr, null);
+ }
+
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr, Map<Binary, Outcome> unsettled)
+ {
+ return createSendingLinkEndpoint(name, targetAddr, sourceAddr, false, unsettled);
+ }
+
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr,
+ final String sourceAddr, boolean durable,
+ Map<Binary, Outcome> unsettled)
+ {
+
+ Source source = new Source();
+ source.setAddress(sourceAddr);
+ Target target = new Target();
+ target.setAddress(targetAddr);
+ if(durable)
+ {
+ target.setDurable(TerminusDurability.UNSETTLED_STATE);
+ target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+
+ return createSendingLinkEndpoint(name, source, target, unsettled);
+
+ }
+
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target)
+ {
+ return createSendingLinkEndpoint(name, source, target, null);
+ }
+
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled)
+ {
+ SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled);
+ endpoint.setSource(source);
+ endpoint.setTarget(target);
+ UnsignedInteger handle = findNextAvailableHandle();
+ _localLinkEndpoints.put(endpoint, handle);
+ endpoint.setLocalHandle(handle);
+ getLinkMap().put(name, endpoint);
+
+ return endpoint;
+ }
+
+ public void sendAttach(Attach attach)
+ {
+ send(attach);
+ }
+
+ public void sendTransfer(final Transfer xfr, SendingLinkEndpoint endpoint, boolean newDelivery)
+ {
+ _nextOutgoingTransferId.incr();
+ UnsignedInteger deliveryId;
+ if(newDelivery)
+ {
+ deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
+ endpoint.setLastDeliveryId(deliveryId);
+ }
+ else
+ {
+ deliveryId = endpoint.getLastDeliveryId();
+ }
+ xfr.setDeliveryId(deliveryId);
+
+ if(!Boolean.TRUE.equals(xfr.getSettled()))
+ {
+ Delivery delivery;
+ if((delivery = _outgoingUnsettled.get(deliveryId))== null)
+ {
+ delivery = new Delivery(xfr, endpoint);
+ _outgoingUnsettled.put(deliveryId, delivery);
+
+ }
+ else
+ {
+ delivery.addTransfer(xfr);
+ }
+ _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+ endpoint.addUnsettled(delivery);
+
+ }
+
+ try
+ {
+ ByteBuffer payload = xfr.getPayload();
+ int payloadSent = send(xfr, payload);
+
+ if(payload != null && payloadSent < payload.remaining())
+ {
+ payload = payload.duplicate();
+try
+{
+ payload.position(payload.position()+payloadSent);
+}
+catch(IllegalArgumentException e)
+{
+ System.err.println("UNEXPECTED");
+ System.err.println("Payload Position: " + payload.position());
+ System.err.println("Payload Sent: " + payloadSent);
+ System.err.println("Payload Remaining: " + payload.remaining());
+ throw e;
+
+}
+
+ Transfer secondTransfer = new Transfer();
+
+ secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+ secondTransfer.setHandle(xfr.getHandle());
+ secondTransfer.setSettled(xfr.getSettled());
+ secondTransfer.setState(xfr.getState());
+ secondTransfer.setMessageFormat(xfr.getMessageFormat());
+ secondTransfer.setPayload(payload);
+
+ sendTransfer(secondTransfer, endpoint, false);
+
+ }
+ }
+ catch(OversizeFrameException e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public Object getLock()
+ {
+ return _connection.getLock();
+ }
+
+ public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
+ String targetAddr,
+ String sourceAddr,
+ UnsignedInteger initialCredit,
+ final DistributionMode distributionMode)
+ {
+ Source source = new Source();
+ source.setAddress(sourceAddr);
+ source.setDistributionMode(distributionMode);
+ Target target = new Target();
+ target.setAddress(targetAddr);
+
+ return createReceivingLinkEndpoint(name, target, source, initialCredit);
+ }
+
+ public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
+ Target target,
+ Source source,
+ UnsignedInteger initialCredit)
+ {
+ ReceivingLinkEndpoint endpoint = new ReceivingLinkEndpoint(this, name);
+ endpoint.setLinkCredit(initialCredit);
+ endpoint.setSource(source);
+ endpoint.setTarget(target);
+ UnsignedInteger handle = findNextAvailableHandle();
+ _localLinkEndpoints.put(endpoint, handle);
+ endpoint.setLocalHandle(handle);
+ getLinkMap().put(name, endpoint);
+
+ return endpoint;
+
+ }
+
+ public void updateDisposition(final Role role,
+ final UnsignedInteger first,
+ final UnsignedInteger last,
+ final DeliveryState state,
+ final boolean settled)
+ {
+
+
+ Disposition disposition = new Disposition();
+ disposition.setRole(role);
+ disposition.setFirst(first);
+ disposition.setLast(last);
+ disposition.setSettled(settled);
+
+ disposition.setState(state);
+
+
+ if(settled)
+ {
+ if(role == Role.RECEIVER)
+ {
+ SequenceNumber pos = new SequenceNumber(first.intValue());
+ SequenceNumber end = new SequenceNumber(last.intValue());
+ while(pos.compareTo(end)<=0)
+ {
+ Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
+
+/*
+ _availableIncomingCredit += d.getTransfers().size();
+*/
+
+ pos.incr();
+ }
+ }
+ }
+
+ send(disposition);
+ checkSendFlow();
+ }
+
+ public void settle(Role role, final UnsignedInteger deliveryId)
+ {
+ if(role == Role.RECEIVER)
+ {
+ Delivery d = _incomingUnsettled.remove(deliveryId);
+ if(d != null)
+ {
+/*
+ _availableIncomingCredit += d.getTransfers().size();
+*/
+ }
+ }
+ else
+ {
+ Delivery d = _outgoingUnsettled.remove(deliveryId);
+/* if(d != null)
+ {
+ _availableOutgoingCredit += d.getTransfers().size();
+
+ }*/
+ }
+
+ }
+
+ public void sendFlow()
+ {
+ sendFlow(new Flow());
+ }
+ public void sendFlow(final Flow flow)
+ {
+ final int nextIncomingId = _nextIncomingTransferId.intValue();
+ flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
+ flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
+ _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
+
+ flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
+ send(flow);
+ }
+
+ public void sendFlowConditional()
+ {
+ UnsignedInteger clientsCredit = _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
+ int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
+ if(i >=0)
+ {
+ sendFlow();
+ }
+
+ }
+
+ public void sendDetach(Detach detach)
+ {
+ send(detach);
+
+ }
+
+ void doEnd(End end)
+ {
+ }
+
+ public void setNextIncomingId(final UnsignedInteger nextIncomingId)
+ {
+ _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
+
+ }
+
+ public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
+ {
+ _outgoingSessionCredit = outgoingSessionCredit;
+ }
+
+ public UnsignedInteger getNextOutgoingId()
+ {
+ return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
+ }
+
+ public UnsignedInteger getOutgoingWindowSize()
+ {
+ return UnsignedInteger.valueOf(_availableOutgoingCredit);
+ }
+
+ public boolean hasCreditToSend()
+ {
+ boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
+ boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
+ return b && b1;
+ }
+
+ public UnsignedInteger getIncomingWindowSize()
+ {
+ return UnsignedInteger.valueOf(_availableIncomingCredit);
+ }
+
+ public SessionEventListener getSessionEventListener()
+ {
+ return _sessionEventListener;
+ }
+
+ public void setSessionEventListener(final SessionEventListener sessionEventListener)
+ {
+ _sessionEventListener = sessionEventListener;
+ }
+
+ public ConnectionEndpoint getConnection()
+ {
+ return _connection;
+ }
+
+ public SendingLinkEndpoint createTransactionController(String name, TxnCapability... capabilities)
+ {
+ Coordinator coordinator = new Coordinator();
+ coordinator.setCapabilities(capabilities);
+
+ Source src = new Source();
+
+ return createSendingLinkEndpoint(name, src, coordinator);
+ }
+
+ Map<String, LinkEndpoint> getLinkMap()
+ {
+ return _linkMap;
+ }
+
+
+ public boolean isEnded()
+ {
+ return _state == SessionState.ENDED || _connection.isClosed();
+ }
+
+ public boolean isActive()
+ {
+ return _state == SessionState.ACTIVE;
+ }
+}