diff options
author | Aidan Skinner <aidan@apache.org> | 2008-04-18 22:18:05 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-04-18 22:18:05 +0000 |
commit | a994fd17e31e26de4daaf86fbd8096a90e8bb3bf (patch) | |
tree | 0dfc34675f0d3e3d78997f920decb7babaca375f | |
parent | 1342bb533040492cb85b638267023d9ad3a6d72e (diff) | |
download | qpid-python-a994fd17e31e26de4daaf86fbd8096a90e8bb3bf.tar.gz |
QPID-832 fix some compile errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@649711 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 55 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 54 |
2 files changed, 14 insertions, 95 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d67443a5b7..ec3e7838f5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -189,51 +189,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _fastAccessConsumers[i] = null; } } - - - public void acknowledgeDelivered() - { - - for(int i = 0; i<16; i++) - { - final BasicMessageConsumer c = _fastAccessConsumers[i]; - if(c != null) - { - c.acknowledgeDelivered(); - } - } - if(!_slowAccessConsumers.isEmpty()) - { - for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) - { - i.next().acknowledgeDelivered(); - } - } - } - - public void acknowledge() throws JMSException - { - for(int i = 0; i<16; i++) - { - final BasicMessageConsumer c = _fastAccessConsumers[i]; - if(c != null) - { - c.acknowledge(); - } - } - if(!_slowAccessConsumers.isEmpty()) - { - for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) - { - i.next().acknowledge(); - } - } - } } - - - /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -555,14 +512,22 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws JMSException + public void acknowledge() throws IllegalStateException { if (isClosed()) { throw new IllegalStateException("Session is already closed"); } - _consumers.acknowledge(); + while (true) + { + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + acknowledgeMessage(tag, false); + } } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index a345e55d8b..2e9aea3dcb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -285,29 +285,11 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - // TGM FIXME not sure messages are being dealt with right - switch (_session.getAcknowledgeMode()) + if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - case Session.DUPS_OK_ACKNOWLEDGE: - case Session.CLIENT_ACKNOWLEDGE: - _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); - break; - - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(jmsMsg.getDeliveryTag(), false); - } - else - { - _logger.info("Recording tag for commit:" + jmsMsg.getDeliveryTag()); - _receivedDeliveryTags.add(jmsMsg.getDeliveryTag()); - } - - break; _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } - + _session.setInRecovery(false); } @@ -945,35 +927,6 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - public void acknowledge() throws JMSException - { - if (isClosed()) - { - throw new IllegalStateException("Consumer is closed"); - } - else if (_session.hasFailedOver()) - { - throw new JMSException("has failed over"); - } - else - { - Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); - while (tags.hasNext()) - { - _session.acknowledgeMessage(tags.next(), false); - tags.remove(); - } - } - } - - /** - * Called on recovery to reset the list of delivery tags - */ - public void clearUnackedMessages() - { - _unacknowledgedDeliveryTags.clear(); - } - public boolean isAutoClose() { return _autoClose; @@ -1092,6 +1045,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me public void failedOver() { clearReceiveQueue(); - clearUnackedMessages(); + // TGM FIXME: think this should just be removed + // clearUnackedMessages(); } } |