diff options
Diffstat (limited to 'qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java')
-rw-r--r-- | qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java | 949 |
1 files changed, 497 insertions, 452 deletions
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 0feaa48805..e35248f58c 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -1,452 +1,497 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Sender implements DeliveryStateHandler
-{
- private SendingLinkEndpoint _endpoint;
- private int _id;
- private Session _session;
- private int _windowSize;
- private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
- private boolean _closed;
- private Error _error;
- private Runnable _remoteErrorTask;
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, false);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- boolean synchronous)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
- }
-
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, AcknowledgeMode.ALO);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
- source.setAddress(sourceAddr);
- return source;
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
- target.setAddress(targetAddr);
- if(isDurable)
- {
- target.setDurable(TerminusDurability.UNSETTLED_STATE);
- target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- return target;
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
-
- _session = session;
- session.getConnection().checkNotClosed();
- _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
- source, target, unsettled);
-
-
- switch(mode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
- _endpoint.setDeliveryStateHandler(this);
- _endpoint.attach();
- _windowSize = window;
-
- synchronized(_endpoint.getLock())
- {
- while(!(_endpoint.isAttached() || _endpoint.isDetached()))
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
- }
- if(_endpoint.getTarget()== null)
- {
- throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
- };
- }
-
- _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
- {
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(_error != null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
- }
-
- public Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public Target getTarget()
- {
- return _endpoint.getTarget();
- }
-
- public void send(Message message) throws LinkDetachedException
- {
- send(message, null, null);
- }
-
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException
- {
- send(message, null, action);
- }
-
- public void send(Message message, final Transaction txn) throws LinkDetachedException
- {
- send(message, txn, null);
- }
-
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
- {
-
- List<Section> sections = message.getPayload();
-
- Transfer xfr = new Transfer();
-
- if(sections != null && !sections.isEmpty())
- {
- SectionEncoder encoder = _session.getSectionEncoder();
- encoder.reset();
-
- int sectionNumber = 0;
- for(Section section : sections)
- {
- encoder.encodeObject(section);
- }
-
-
- Binary encoding = encoder.getEncoding();
- ByteBuffer payload = encoding.asByteBuffer();
- xfr.setPayload(payload);
- }
- if(message.getDeliveryTag() == null)
- {
- message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
- }
- if(message.isResume())
- {
- xfr.setResume(Boolean.TRUE);
- }
- if(message.getDeliveryState() != null)
- {
- xfr.setState(message.getDeliveryState());
- }
-
- xfr.setDeliveryTag(message.getDeliveryTag());
- //xfr.setSettled(_windowSize ==0);
- if(txn != null)
- {
- xfr.setSettled(false);
- TransactionalState deliveryState = new TransactionalState();
- deliveryState.setTxnId(txn.getTxnId());
- xfr.setState(deliveryState);
- }
- else
- {
- xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
- }
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- if(_endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- if(action != null)
- {
- _outcomeActions.put(message.getDeliveryTag(), action);
- }
- _endpoint.transfer(xfr);
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
- }
-
- if(_windowSize != 0)
- {
- synchronized(lock)
- {
-
-
- while(_endpoint.getUnsettledCount() >= _windowSize)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
-
- }
-
- public void close() throws SenderClosingException
- {
-
- if(_windowSize != 0)
- {
- synchronized(_endpoint.getLock())
- {
-
-
- while(_endpoint.getUnsettledCount() > 0)
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
- _session.removeSender(this);
- _endpoint.setSource(null);
- _endpoint.detach();
- _closed = true;
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderClosingException(e);
- }
- }
- }
- }
-
- public boolean isClosed()
- {
- return _closed;
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- if(state instanceof Outcome)
- {
- OutcomeAction action;
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, (Outcome) state);
- }
- if(!Boolean.TRUE.equals(settled))
- {
- _endpoint.updateDisposition(deliveryTag, state, true);
- }
- }
- else if(state instanceof TransactionalState)
- {
- OutcomeAction action;
-
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
- }
-
- }
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Map<Binary, DeliveryState> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
- public Session getSession()
- {
- return _session;
- }
-
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public class SenderCreationException extends Exception
- {
- public SenderCreationException(Throwable e)
- {
- super(e);
- }
-
- public SenderCreationException(String e)
- {
- super(e);
-
- }
- }
-
- public class SenderClosingException extends Exception
- {
- public SenderClosingException(Throwable e)
- {
- super(e);
- }
- }
-
- public static interface OutcomeAction
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.*; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.qpid.amqp_1_0.type.transport.Error; + +public class Sender implements DeliveryStateHandler +{ + private SendingLinkEndpoint _endpoint; + private int _id; + private Session _session; + private int _windowSize; + private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>()); + private boolean _closed; + private Error _error; + private Runnable _remoteErrorTask; + private Outcome _defaultOutcome; + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, false); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + boolean synchronous) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window) throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO); + } + + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window) throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, target, source, window, AcknowledgeMode.ALO); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, mode, null); + } + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window, AcknowledgeMode mode) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, target, source, window, mode, null); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled); + } + + protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source) + { + + } + + protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target) + { + + } + + private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr) + { + org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source(); + source.setAddress(sourceAddr); + return source; + } + + private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable) + { + org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target(); + target.setAddress(targetAddr); + if(isDurable) + { + target.setDurable(TerminusDurability.UNSETTLED_STATE); + target.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + } + return target; + } + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled) + throws SenderCreationException, ConnectionClosedException + { + + _session = session; + session.getConnection().checkNotClosed(); + configureSource(source); + configureTarget(target); + _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName, + source, target, unsettled); + + + switch(mode) + { + case ALO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); + break; + case EO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + _endpoint.setDeliveryStateHandler(this); + _endpoint.attach(); + _windowSize = window; + + synchronized(_endpoint.getLock()) + { + while(!(_endpoint.isAttached() || _endpoint.isDetached())) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + throw new SenderCreationException(e); + } + } + if(_endpoint.getTarget()== null) + { + throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); + }; + } + + _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() + { + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + if(_error != null) + { + remoteError(); + } + super.remoteDetached(endpoint, detach); + } + }); + + _defaultOutcome = source.getDefaultOutcome(); + if(_defaultOutcome == null) + { + if(source.getOutcomes() == null || source.getOutcomes().length == 0) + { + _defaultOutcome = new Accepted(); + } + else if(source.getOutcomes().length == 1) + { + + final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession() + .getConnection() + .getDescribedTypeRegistry(); + + DescribedTypeConstructor constructor = describedTypeRegistry + .getConstructor(source.getOutcomes()[0]); + if(constructor != null) + { + Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST); + if(impliedOutcome instanceof Outcome) + { + _defaultOutcome = (Outcome) impliedOutcome; + } + } + + } + } + } + + public Source getSource() + { + return _endpoint.getSource(); + } + + public Target getTarget() + { + return _endpoint.getTarget(); + } + + public void send(Message message) throws LinkDetachedException + { + send(message, null, null); + } + + public void send(Message message, final OutcomeAction action) throws LinkDetachedException + { + send(message, null, action); + } + + public void send(Message message, final Transaction txn) throws LinkDetachedException + { + send(message, txn, null); + } + + public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException + { + + List<Section> sections = message.getPayload(); + + Transfer xfr = new Transfer(); + + if(sections != null && !sections.isEmpty()) + { + SectionEncoder encoder = _session.getSectionEncoder(); + encoder.reset(); + + int sectionNumber = 0; + for(Section section : sections) + { + encoder.encodeObject(section); + } + + + Binary encoding = encoder.getEncoding(); + ByteBuffer payload = encoding.asByteBuffer(); + xfr.setPayload(payload); + } + if(message.getDeliveryTag() == null) + { + message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes())); + } + if(message.isResume()) + { + xfr.setResume(Boolean.TRUE); + } + if(message.getDeliveryState() != null) + { + xfr.setState(message.getDeliveryState()); + } + + xfr.setDeliveryTag(message.getDeliveryTag()); + //xfr.setSettled(_windowSize ==0); + if(txn != null) + { + xfr.setSettled(false); + TransactionalState deliveryState = new TransactionalState(); + deliveryState.setTxnId(txn.getTxnId()); + xfr.setState(deliveryState); + } + else + { + xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED); + } + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + if(_endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } + if(action != null) + { + _outcomeActions.put(message.getDeliveryTag(), action); + } + _endpoint.transfer(xfr); + //TODO - rationalise sending of flows + // _endpoint.sendFlow(); + } + + if(_windowSize != 0) + { + synchronized(lock) + { + + + while(_endpoint.getUnsettledCount() >= _windowSize) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + + + } + + public void close() throws SenderClosingException + { + + if(_windowSize != 0) + { + synchronized(_endpoint.getLock()) + { + + + while(_endpoint.getUnsettledCount() > 0) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + _session.removeSender(this); + _endpoint.setSource(null); + _endpoint.detach(); + _closed = true; + + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + throw new SenderClosingException(e); + } + } + } + } + + public boolean isClosed() + { + return _closed; + } + + public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) + { + if(state instanceof Outcome) + { + OutcomeAction action; + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + + final Outcome outcome = (Outcome) state; + action.onOutcome(deliveryTag, (outcome == null && settled) ? _defaultOutcome : outcome); + } + if(!Boolean.TRUE.equals(settled)) + { + _endpoint.updateDisposition(deliveryTag, state, true); + } + } + else if(state instanceof TransactionalState) + { + OutcomeAction action; + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + final Outcome outcome = ((TransactionalState) state).getOutcome(); + action.onOutcome(deliveryTag, outcome == null ? _defaultOutcome : outcome); + } + + } + } + + public SendingLinkEndpoint getEndpoint() + { + return _endpoint; + } + + public Map<Binary, DeliveryState> getRemoteUnsettled() + { + return _endpoint.getInitialUnsettledMap(); + } + + public Session getSession() + { + return _session; + } + + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } + + public Error getError() + { + return _error; + } + + public class SenderCreationException extends Exception + { + public SenderCreationException(Throwable e) + { + super(e); + } + + public SenderCreationException(String e) + { + super(e); + + } + } + + public class SenderClosingException extends Exception + { + public SenderClosingException(Throwable e) + { + super(e); + } + } + + public static interface OutcomeAction + { + public void onOutcome(Binary deliveryTag, Outcome outcome); + } +} |