summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-04-18 22:18:05 +0000
committerAidan Skinner <aidan@apache.org>2008-04-18 22:18:05 +0000
commita994fd17e31e26de4daaf86fbd8096a90e8bb3bf (patch)
tree0dfc34675f0d3e3d78997f920decb7babaca375f
parent1342bb533040492cb85b638267023d9ad3a6d72e (diff)
downloadqpid-python-a994fd17e31e26de4daaf86fbd8096a90e8bb3bf.tar.gz
QPID-832 fix some compile errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@649711 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java55
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java54
2 files changed, 14 insertions, 95 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d67443a5b7..ec3e7838f5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -189,51 +189,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_fastAccessConsumers[i] = null;
}
}
-
-
- public void acknowledgeDelivered()
- {
-
- for(int i = 0; i<16; i++)
- {
- final BasicMessageConsumer c = _fastAccessConsumers[i];
- if(c != null)
- {
- c.acknowledgeDelivered();
- }
- }
- if(!_slowAccessConsumers.isEmpty())
- {
- for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();)
- {
- i.next().acknowledgeDelivered();
- }
- }
- }
-
- public void acknowledge() throws JMSException
- {
- for(int i = 0; i<16; i++)
- {
- final BasicMessageConsumer c = _fastAccessConsumers[i];
- if(c != null)
- {
- c.acknowledge();
- }
- }
- if(!_slowAccessConsumers.isEmpty())
- {
- for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();)
- {
- i.next().acknowledge();
- }
- }
- }
}
-
-
-
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -555,14 +512,22 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @throws IllegalStateException If the session is closed.
*/
- public void acknowledge() throws JMSException
+ public void acknowledge() throws IllegalStateException
{
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- _consumers.acknowledge();
+ while (true)
+ {
+ Long tag = _unacknowledgedMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+ acknowledgeMessage(tag, false);
+ }
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index a345e55d8b..2e9aea3dcb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -285,29 +285,11 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
- // TGM FIXME not sure messages are being dealt with right
- switch (_session.getAcknowledgeMode())
+ if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
- case Session.DUPS_OK_ACKNOWLEDGE:
- case Session.CLIENT_ACKNOWLEDGE:
- _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
- break;
-
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(jmsMsg.getDeliveryTag(), false);
- }
- else
- {
- _logger.info("Recording tag for commit:" + jmsMsg.getDeliveryTag());
- _receivedDeliveryTags.add(jmsMsg.getDeliveryTag());
- }
-
- break;
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
-
+
_session.setInRecovery(false);
}
@@ -945,35 +927,6 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- public void acknowledge() throws JMSException
- {
- if (isClosed())
- {
- throw new IllegalStateException("Consumer is closed");
- }
- else if (_session.hasFailedOver())
- {
- throw new JMSException("has failed over");
- }
- else
- {
- Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
- while (tags.hasNext())
- {
- _session.acknowledgeMessage(tags.next(), false);
- tags.remove();
- }
- }
- }
-
- /**
- * Called on recovery to reset the list of delivery tags
- */
- public void clearUnackedMessages()
- {
- _unacknowledgedDeliveryTags.clear();
- }
-
public boolean isAutoClose()
{
return _autoClose;
@@ -1092,6 +1045,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
public void failedOver()
{
clearReceiveQueue();
- clearUnackedMessages();
+ // TGM FIXME: think this should just be removed
+ // clearUnackedMessages();
}
}