summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java209
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java84
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java34
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java145
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java76
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java60
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java67
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java33
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java3
21 files changed, 345 insertions, 570 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 92f9ebe07c..b1a22155d6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -36,11 +36,13 @@ import javax.jms.XASession;
import javax.net.ssl.SSLContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
@@ -69,8 +71,30 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public void closeConnection(long timeout) throws JMSException, AMQException
{
- _conn.getProtocolHandler().closeConnection(timeout);
+ final AMQStateManager stateManager = _conn.getProtocolHandler().getStateManager();
+ final AMQState currentState = stateManager.getCurrentState();
+ if (currentState.equals(AMQState.CONNECTION_CLOSED))
+ {
+ _logger.debug("Connection already closed.");
+ }
+ else if (currentState.equals(AMQState.CONNECTION_CLOSING))
+ {
+ _logger.debug("Connection already closing, awaiting closed state.");
+ final StateWaiter closeWaiter = new StateWaiter(stateManager, currentState, EnumSet.of(AMQState.CONNECTION_CLOSED));
+ try
+ {
+ closeWaiter.await(timeout);
+ }
+ catch (AMQTimeoutException te)
+ {
+ throw new AMQTimeoutException("Close did not complete in timely fashion", te);
+ }
+ }
+ else
+ {
+ _conn.getProtocolHandler().closeConnection(timeout);
+ }
}
public AMQConnectionDelegate_8_0(AMQConnection conn)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 3ef32fb008..acd46da11a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -793,7 +793,7 @@ public abstract class AMQDestination implements Destination, Referenceable
return _browseOnly;
}
- public void setBrowseOnly(boolean b)
+ private void setBrowseOnly(boolean b)
{
_browseOnly = b;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e0da1ef41f..30c7403a90 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -70,7 +70,6 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -88,8 +87,6 @@ import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -216,8 +213,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
- protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
-
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
@@ -367,7 +362,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to ensure that an auto ack is not sent.
*/
- private boolean _inRecovery;
+ private volatile boolean _sessionInRecovery;
+
+ /**
+ * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+ * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+ */
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -617,30 +618,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new IllegalStateException("Session is already closed");
}
- else if (hasFailedOver())
+ else if (hasFailedOverDirty())
{
+ //perform an implicit recover in this scenario
+ recover();
+
+ //notify the consumer
throw new IllegalStateException("has failed over");
}
- while (true)
+ try
{
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- try
- {
- acknowledgeMessage(tag, false);
- }
- catch (TransportException e)
- {
- throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
- }
+ acknowledgeImpl();
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
}
}
+ protected abstract void acknowledgeImpl() throws JMSException;
+
/**
* Acknowledge one or many messages.
*
@@ -849,42 +847,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- * @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
checkTransacted();
- try
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
{
- //Check that we are clean to commit.
- if (_failedOverDirty)
- {
- rollback();
-
- throw new TransactionRolledBackException("Connection failover has occured since last send. " +
- "Forced rollback");
- }
+ rollback();
+ throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+ "The session transaction was rolled back.");
+ }
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
- // Commits outstanding messages and acknowledgments
- sendCommit();
+ try
+ {
+ commitImpl();
markClean();
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+ throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -896,8 +880,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- public abstract void sendCommit() throws AMQException, FailoverException;
-
+ protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
public void confirmConsumerCancelled(int consumerTag)
{
@@ -975,7 +958,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -989,15 +972,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public C createExclusiveConsumer(Destination destination) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -1005,7 +980,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -1014,16 +989,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
- messageSelector, null, false, false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1031,23 +997,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector) throws JMSException
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
- String selector, FieldTable rawSelector) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1055,7 +1013,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
false);
}
@@ -1244,12 +1202,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return createProducerImpl(destination, mandatory, immediate);
}
- public P createProducer(Destination destination, boolean mandatory, boolean immediate,
- boolean waitUntilSent) throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
- }
-
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
@@ -1467,9 +1419,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- Topic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
}
/**
@@ -1486,10 +1439,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- Topic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
+ true, messageSelector, null, false, false));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1591,10 +1545,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
abstract public void sync() throws AMQException;
- public int getAcknowledgeMode() throws JMSException
+ public int getAcknowledgeMode()
{
- checkNotClosed();
-
return _acknowledgeMode;
}
@@ -1654,10 +1606,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _ticket;
}
- public boolean getTransacted() throws JMSException
+ public boolean getTransacted()
{
- checkNotClosed();
-
return _transacted;
}
@@ -1759,8 +1709,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// flush any acks we are holding in the buffer.
flushAcknowledgments();
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
+ // this is only set true here, and only set false when the consumers preDeliver method is called
+ _sessionInRecovery = true;
boolean isSuspended = isSuspended();
@@ -1768,9 +1718,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
suspendChannel(true);
}
-
+
+ // Set to true to short circuit delivery of anything currently
+ //in the pre-dispatch queue.
+ _usingDispatcherForCleanup = true;
+
syncDispatchQueue();
-
+
+ // Set to false before sending the recover as 0-8/9/9-1 will
+ //send messages back before the recover completes, and we
+ //probably shouldn't clean those! ;-)
+ _usingDispatcherForCleanup = false;
+
if (_dispatcher != null)
{
_dispatcher.recover();
@@ -1779,10 +1738,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
markClean();
-
- // Set inRecovery to false before you start message flow again again.
- _inRecovery = false;
-
+
if (!isSuspended)
{
suspendChannel(false);
@@ -2018,6 +1974,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkTemporaryDestination(destination);
+ if(!noConsume && isBrowseOnlyDestination(destination))
+ {
+ throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
+ "but a 'browseOnly' Destination has been supplied.");
+ }
+
final String messageSelector;
if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -2176,7 +2138,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
boolean isInRecovery()
{
- return _inRecovery;
+ return _sessionInRecovery;
}
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2298,7 +2260,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void setInRecovery(boolean inRecovery)
{
- _inRecovery = inRecovery;
+ _sessionInRecovery = inRecovery;
}
boolean isStarted()
@@ -2637,15 +2599,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendConsume(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
- }
-
- private P createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent) throws JMSException
- {
return new FailoverRetrySupport<P, JMSException>(
new FailoverProtectedOperation<P, JMSException>()
{
@@ -2658,7 +2614,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+ immediate, producerId);
}
catch (TransportException e)
{
@@ -2673,7 +2629,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException;
+ final boolean immediate, final long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
@@ -3113,21 +3069,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* @return boolean true if failover has occured.
*/
- public boolean hasFailedOver()
+ public boolean hasFailedOverDirty()
{
return _failedOverDirty;
}
- /**
- * Check to see if any message have been sent in this transaction and have not been commited.
- *
- * @return boolean true if a message has been sent but not commited
- */
- public boolean isDirty()
- {
- return _dirty;
- }
-
public void setTicket(int ticket)
{
_ticket = ticket;
@@ -3391,7 +3337,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
rejectMessage(message, true);
}
- else if (isInRecovery())
+ else if (_usingDispatcherForCleanup)
{
_unacknowledgedMessageTags.add(deliveryTag);
}
@@ -3575,4 +3521,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
return code;
}
+
+ private boolean isBrowseOnlyDestination(Destination destination)
+ {
+ return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index bfbb9f7148..86e1fc08de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -270,7 +270,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
@@ -412,25 +412,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
-
- /**
- * Commit the receipt and the delivery of all messages exchanged by this session resources.
- */
- public void sendCommit() throws AMQException, FailoverException
- {
- getQpidSession().setAutoSync(true);
- try
- {
- getQpidSession().txCommit();
- }
- finally
- {
- getQpidSession().setAutoSync(false);
- }
- // We need to sync so that we get notify of an error.
- sync();
- }
-
/**
* Create a queue with a given name.
*
@@ -463,6 +444,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
+ RangeSet ranges = gatherUnackedRangeSet();
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ // We need to sync so that we get notify of an error.
+ sync();
+ }
+
+ private RangeSet gatherUnackedRangeSet()
+ {
RangeSet ranges = new RangeSet();
while (true)
{
@@ -471,11 +460,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
break;
}
- ranges.add((int) (long) tag);
+
+ ranges.add(tag.intValue());
}
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- sync();
+
+ return ranges;
}
@@ -676,13 +665,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Create an 0_10 message producer
*/
public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent,
- long producerId) throws JMSException
+ final boolean immediate, final long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
- getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -998,32 +986,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- @Override
- public void commit() throws JMSException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
- checkTransacted();
- try
- {
- if( _txSize > 0 )
- {
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
- }
- sendCommit();
- }
- catch(TransportException e)
+ if( _txSize > 0 )
{
- throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
}
- catch (AMQException e)
+
+ getQpidSession().setAutoSync(true);
+ try
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ getQpidSession().txCommit();
}
- catch (FailoverException e)
+ finally
{
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ getQpidSession().setAutoSync(false);
}
+ // We need to sync so that we get notify of an error.
+ sync();
}
protected final boolean tagLE(long tag1, long tag2)
@@ -1386,4 +1368,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return sb.toString();
}
+ protected void acknowledgeImpl()
+ {
+ RangeSet range = gatherUnackedRangeSet();
+
+ if(range.size() > 0 )
+ {
+ messageAcknowledge(range, true);
+ getQpidSession().sync();
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index c010e4c7ed..ccb2b00947 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,6 +126,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return getProtocolHandler().getProtocolVersion();
}
+ protected void acknowledgeImpl()
+ {
+ while (true)
+ {
+ Long tag = _unacknowledgedMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+ }
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -170,8 +185,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
+ // Acknowledge all delivered messages
+ while (true)
+ {
+ Long tag = _deliveredMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
@@ -401,12 +428,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
+ final boolean immediate, long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
- this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ this, getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -615,5 +642,4 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return null;
}
}
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 754055ad98..3b807591b0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -37,10 +37,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -118,29 +115,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected final int _acknowledgeMode;
/**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
- private final Object _commitLock = new Object();
-
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -290,17 +268,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
- {
- if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
- }
-
- _session.setInRecovery(false);
- preDeliver(jmsMsg);
- }
-
/**
* @param immediate if true then return immediately if the connection is failing over
*
@@ -323,14 +290,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- if (!_receiving.compareAndSet(false, true))
+ if (isMessageListenerSet())
{
- throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
}
- if (isMessageListenerSet())
+ if (!_receiving.compareAndSet(false, true))
{
- throw new javax.jms.IllegalStateException("A listener has already been set.");
+ throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
_receivingThread = Thread.currentThread();
@@ -409,7 +376,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
return m;
@@ -482,7 +449,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
@@ -734,7 +701,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
if (isMessageListenerSet())
{
- preApplicationProcessing(jmsMessage);
+ preDeliver(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -758,49 +725,42 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- void preDeliver(AbstractJMSMessage msg)
+ protected void preDeliver(AbstractJMSMessage msg)
{
+ _session.setInRecovery(false);
+
switch (_acknowledgeMode)
{
-
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
+ case Session.AUTO_ACKNOWLEDGE:
+ //fall through
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ break;
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
msg.setAMQSession(_session);
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ _session.markDirty();
break;
case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _session.addDeliveredMessage(msg.getDeliveryTag());
- _session.markDirty();
- }
-
+ _session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ //do nothing.
+ //path used for NO-ACK consumers, and browsers (see constructor).
break;
}
-
}
void postDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
-
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- _session.markDirty();
- break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
@@ -838,63 +798,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
- void acknowledgeDelivered()
- {
- synchronized(_commitLock)
- {
- ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- tagsToAck.add(_receivedDeliveryTags.poll());
- }
-
- Collections.sort(tagsToAck);
-
- long prevAcked = _lastAcked;
- long oldAckPoint = -1;
-
- while(oldAckPoint != prevAcked)
- {
- oldAckPoint = prevAcked;
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
- {
- tagsToAckIterator.remove();
- prevAcked++;
- }
-
- Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
- while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
- {
- previousAckIterator.remove();
- prevAcked++;
- }
-
- }
- if(prevAcked != _lastAcked)
- {
- _session.acknowledgeMessage(prevAcked, true);
- _lastAcked = prevAcked;
- }
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext())
- {
- Long tag = tagsToAckIterator.next();
- _session.acknowledgeMessage(tag, false);
- _previouslyAcked.add(tag);
- }
- }
- }
-
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -973,7 +876,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public boolean isNoConsume()
{
- return _noConsume || _destination.isBrowseOnly() ;
+ return _noConsume;
}
public void rollback()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 47da59724c..3c24c67f9b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -66,19 +66,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean _preAcquire = true;
/**
- * Indicate whether this consumer is started.
- */
- private boolean _isStarted = false;
-
- /**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
private long capacity = 0;
-
- //--- constructor
+
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -104,7 +98,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_preAcquire = false;
}
}
- _isStarted = connection.started();
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
@@ -172,8 +165,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- //----- overwritten methods
-
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
@@ -203,11 +194,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super.notifyMessage(messageFrame);
}
- @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ @Override
+ protected void preDeliver(AbstractJMSMessage jmsMsg)
{
- super.preApplicationProcessing(jmsMsg);
- if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
+ super.preDeliver(jmsMsg);
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
+ //For 0-10 we need to ensure that all messages are indicated processed in some way to
+ //ensure their AMQP command-id is marked completed, and so we must send a completion
+ //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
+ //Add message to the unacked message list to ensure we dont lose record of it before
+ //sending a completion of some sort.
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -219,7 +217,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _messageFactory.createMessage(msg.getMessageTransfer());
}
- // private methods
/**
* Check whether a message can be delivered to this consumer.
*
@@ -457,10 +454,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() &&
- _session.getAMQConnection().getSyncAck())
+ !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
{
- ((AMQSession_0_10) getSession()).flushAcknowledgments();
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 2bfca025b2..bf4de782a5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -114,8 +114,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private final boolean _mandatory;
- private final boolean _waitUntilSent;
-
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
@@ -127,8 +125,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
{
_connection = connection;
_destination = destination;
@@ -144,7 +141,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_immediate = immediate;
_mandatory = mandatory;
- _waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
setPublishMode();
}
@@ -364,19 +360,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
- {
- checkPreConditions();
- checkDestination(destination);
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
- waitUntilSent);
- }
- }
-
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
@@ -451,12 +434,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
- {
- sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
- }
-
/**
* The caller of this method must hold the failover mutex.
*
@@ -471,23 +448,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- if (_transacted)
- {
- if (_session.hasFailedOver() && _session.isDirty())
- {
- throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
- new AMQSessionDirtyException("Failover has occurred and session is dirty " +
- "so unable to send."));
- }
- }
-
UUID messageId = null;
if (_disableMessageId)
{
@@ -501,7 +468,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
try
{
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
}
catch (TransportException e)
{
@@ -526,7 +493,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait) throws JMSException;
+ boolean immediate) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 1fa5c1003f..57f64c2f92 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -61,10 +61,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException
+ boolean immediate, boolean mandatory) throws AMQException
{
- super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
- mandatory, waitUntilSent);
+ super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
userIDBytes = Strings.toUTF8(_userID);
}
@@ -104,7 +103,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait) throws JMSException
+ boolean immediate) throws JMSException
{
message.prepareForSending();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 26e9814e33..34d2ade723 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -45,10 +45,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
{
- super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
+ super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
@@ -73,7 +72,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait) throws JMSException
+ boolean immediate) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
destination.getExchangeName(),
@@ -168,7 +167,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
throw jmse;
}
- _protocolHandler.writeFrame(compositeFrame, wait);
+ _protocolHandler.writeFrame(compositeFrame);
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 2cf19bf391..b9d4d6fa95 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -78,7 +78,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
{
throw new AMQNoRouteException("Error: " + reason, null, null);
}
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ else if (errorCode == AMQConstant.ARGUMENT_INVALID)
{
_logger.debug("Broker responded with Invalid Argument.");
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
index c2821591d8..a9434edf49 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
@@ -26,9 +26,7 @@ import org.apache.qpid.client.AMQSession;
import javax.jms.Destination;
import javax.jms.JMSException;
-import java.nio.ByteBuffer;
import java.util.Enumeration;
-import java.util.Map;
import java.util.UUID;
public interface AMQMessageDelegate
@@ -130,9 +128,9 @@ public interface AMQMessageDelegate
void removeProperty(final String propertyName) throws JMSException;
- void setAMQSession(final AMQSession s);
+ void setAMQSession(final AMQSession<?,?> s);
- AMQSession getAMQSession();
+ AMQSession<?,?> getAMQSession();
long getDeliveryTag();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 43b3b85641..f360b546b2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -37,12 +37,10 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
@@ -76,13 +74,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
private Destination _destination;
-
private MessageProperties _messageProps;
private DeliveryProperties _deliveryProps;
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession _session;
- private final long _deliveryTag;
-
protected AMQMessageDelegate_0_10()
{
@@ -92,9 +85,9 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
{
+ super(deliveryTag);
_messageProps = messageProps;
_deliveryProps = deliveryProps;
- _deliveryTag = deliveryTag;
_readableProperties = (_messageProps != null);
AMQDestination dest;
@@ -205,7 +198,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
}
-
public long getJMSTimestamp() throws JMSException
{
return _deliveryProps.getTimestamp();
@@ -291,7 +283,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
try
{
- return AMQDestination.createDestination("ADDR:" + addr.toString());
+ return AMQDestination.createDestination("ADDR:" + addr);
}
catch(Exception e)
{
@@ -325,14 +317,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
try
{
- int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
+ int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
}
}
catch(AMQException ex)
@@ -905,64 +897,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
_readableProperties = false;
}
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
-
-
-
-
-
protected void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index b9ba946a20..9ab03412fe 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -30,7 +30,6 @@ import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -60,15 +59,12 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
private ContentHeaderProperties _contentHeaderProperties;
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession _session;
- private final long _deliveryTag;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
{
+ super(deliveryTag);
_contentHeaderProperties = properties;
- _deliveryTag = deliveryTag;
_readableProperties = (_contentHeaderProperties != null);
_headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
@@ -518,58 +514,4 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
_readableProperties = false;
}
-
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 89fbc9722c..1b6c0c751d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -23,9 +23,13 @@ package org.apache.qpid.client.message;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -78,7 +82,25 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
AMQDestination.QUEUE_TYPE));
-
+ }
+
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession<?,?> _session;
+ private final long _deliveryTag;
+
+ protected AbstractAMQMessageDelegate(long deliveryTag)
+ {
+ _deliveryTag = deliveryTag;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
}
/**
@@ -157,6 +179,47 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
return _exchangeMap.containsKey(exchange);
}
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(getDeliveryTag(), true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.acknowledge();
+ }
+ }
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession<?,?> s)
+ {
+ _session = s;
+ }
+
+ public AMQSession<?,?> getAMQSession()
+ {
+ return _session;
+ }
}
class ExchangeInfo
@@ -202,5 +265,5 @@ class ExchangeInfo
public void setDestType(int destType)
{
this.destType = destType;
- }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 208496ec41..284954edba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.AMQConnectionClosedException;
@@ -66,7 +65,6 @@ import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -516,18 +514,7 @@ public class AMQProtocolHandler implements ProtocolEngine
return getStateManager().createWaiter(states);
}
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
- public void writeFrame(AMQDataBlock frame)
- {
- writeFrame(frame, false);
- }
-
- public synchronized void writeFrame(AMQDataBlock frame, boolean wait)
+ public synchronized void writeFrame(AMQDataBlock frame)
{
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
@@ -678,22 +665,21 @@ public class AMQProtocolHandler implements ProtocolEngine
* <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
* anyway.
*
- * @param timeout The timeout to wait for an acknowledgement to the close request.
+ * @param timeout The timeout to wait for an acknowledgment to the close request.
*
* @throws AMQException If the close fails for any reason.
*/
public void closeConnection(long timeout) throws AMQException
{
- ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
-
- final AMQFrame frame = body.generateFrame(0);
-
- //If the connection is already closed then don't do a syncWrite
if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
+ // Connection is already closed then don't do a syncWrite
try
{
+ final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
+ final AMQFrame frame = body.generateFrame(0);
+
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
_network.close();
closed();
@@ -704,10 +690,9 @@ public class AMQProtocolHandler implements ProtocolEngine
}
catch (FailoverException e)
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
}
}
-
}
/** @return the number of bytes read from this protocol session */
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 5b7d272506..a938bd47f8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,27 +20,35 @@
*/
package org.apache.qpid.client.protocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -289,22 +297,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return _connection.getSession(channelId);
}
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
public void writeFrame(AMQDataBlock frame)
{
_protocolHandler.writeFrame(frame);
}
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _protocolHandler.writeFrame(frame, wait);
- }
-
/**
* Starts the process of closing a session
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 9c7d62670c..0d6fc727c1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.io.IOException;
/**
* The state manager is responsible for managing the state of the protocol session. <p/>
@@ -48,7 +47,7 @@ import java.io.IOException;
*
* The two step process is required as there is an inherit race condition between starting a process that will cause
* the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * that any asynchronous errors that occur can be delivered to the correct waiters.
*/
public class AMQStateManager implements AMQMethodListener
{
@@ -84,7 +83,10 @@ public class AMQStateManager implements AMQMethodListener
public AMQState getCurrentState()
{
- return _currentState;
+ synchronized (_stateLock)
+ {
+ return _currentState;
+ }
}
public void changeState(AMQState newState)
@@ -114,7 +116,7 @@ public class AMQStateManager implements AMQMethodListener
}
/**
- * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+ * Setting of the ProtocolSession will be required when Failover has been successfully completed.
*
* The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
* connection to the network.
@@ -131,9 +133,9 @@ public class AMQStateManager implements AMQMethodListener
}
/**
- * Propogate error to waiters
+ * Propagate error to waiters
*
- * @param error The error to propogate.
+ * @param error The error to propagate.
*/
public void error(Exception error)
{
@@ -177,7 +179,7 @@ public class AMQStateManager implements AMQMethodListener
}
/**
- * Create and add a new waiter to the notifcation list.
+ * Create and add a new waiter to the notification list.
*
* @param states The waiter will attempt to wait for one of these desired set states to be achived.
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 79f438d35d..732480e1c9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -34,7 +34,7 @@ import java.util.Set;
*
* On construction the current state and a set of States to await for is provided.
*
- * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
* a desired state then await() returns immediately.
*
* Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends BlockingWaiter<AMQState>
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- Set<AMQState> _awaitStates;
- private AMQState _startState;
- private AMQStateManager _stateManager;
+ private final Set<AMQState> _awaitStates;
+ private final AMQState _startState;
+ private final AMQStateManager _stateManager;
/**
*
@@ -78,9 +78,9 @@ public class StateWaiter extends BlockingWaiter<AMQState>
}
/**
- * Await for the requried State to be achieved within the default timeout.
+ * Await for the required State to be achieved within the default timeout.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achived.
+ * @throws AMQException The exception that prevented the required state from being achieved.
*/
public AMQState await() throws AMQException
{
@@ -88,13 +88,13 @@ public class StateWaiter extends BlockingWaiter<AMQState>
}
/**
- * Await for the requried State to be achieved.
+ * Await for the required State to be achieved.
*
* <b>It is the responsibility of this class to remove the waiter from the StateManager
*
- * @param timeout The time in milliseconds to wait for any of the states to be achived.
+ * @param timeout The time in milliseconds to wait for any of the states to be achieved.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achived.
+ * @throws AMQException The exception that prevented the required state from being achieved.
*/
public AMQState await(long timeout) throws AMQException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 208658a5ff..bec41644fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -28,9 +28,8 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMethodListener;
*/
public abstract class BlockingWaiter<T>
{
+ private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
+
/** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
@@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e)
{
- System.err.println(e.getMessage());
+ _logger.error(e.getMessage(), e);
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
// if (!_ready && timeout != -1)
// {
@@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T>
}
/**
- * This is a callback, called when an error has occured that should interupt any waiter.
+ * This is a callback, called when an error has occurred that should interrupt any waiter.
* It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
*
* Once closed any notification of an exception will be ignored.
*
- * @param e The exception being propogated.
+ * @param e The exception being propagated.
*/
public void error(Exception e)
{
@@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T>
}
else
{
- System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+ _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
if (_waiting.get())
@@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e1)
{
- System.err.println(e.getMessage());
+ _logger.error(e1.getMessage(), e1);
}
}
_errorAck = false;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
index b830c377b8..4ad917fa83 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
@@ -51,7 +51,4 @@ public interface MessageProducer extends javax.jms.MessageProducer
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException;
- void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException;
-
}