diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-12 11:03:12 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-12 11:03:12 +0000 |
commit | 01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400 (patch) | |
tree | 9ee7bd19e52a62407197840f76f9ff83ebb9097b | |
parent | 83d11a1e4deb2bd5e76d893dbedfe97d4241b2ad (diff) | |
download | qpid-python-01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400.tar.gz |
Fixed Xmx value
ConcurrentSelectorDeliveryManager : Added trace logging. Ensured messages are removed when required, rather than leaking memory.
AMQSession moved exceptions in recover to wrap method rather than individual suspend calls.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506413 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 25 insertions, 25 deletions
diff --git a/qpid/java/AppliedPatches.txt b/qpid/java/AppliedPatches.txt index afc7339731..ac53368036 100644 --- a/qpid/java/AppliedPatches.txt +++ b/qpid/java/AppliedPatches.txt @@ -1,5 +1,6 @@ -Notes- Updated to resolve Rollback/Recover problems. +Changed Xmx to 1024 -Latest Revision- 504112,503646,502576.499979 diff --git a/qpid/java/broker/etc/qpid-server.conf b/qpid/java/broker/etc/qpid-server.conf index 6d31db7fa9..0e98f72a37 100644 --- a/qpid/java/broker/etc/qpid-server.conf +++ b/qpid/java/broker/etc/qpid-server.conf @@ -21,5 +21,5 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar export JAVA=java \ JAVA_VM=-server \ - JAVA_MEM="-Xmx3160m -Xms512m "\ + JAVA_MEM="-Xmx1024m -Xms512m "\ CLASSPATH=$QPID_LIBS diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 1a26bab011..9b79657575 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -230,6 +230,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages); + if (_log.isTraceEnabled()) + { + _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + ") from queue (" + System.identityHashCode(messageQueue) + + ") AMQQueue (" + System.identityHashCode(queue) + ")"); + } + if (messageQueue == null) { // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector @@ -276,7 +283,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //fixme _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages"); - //_messages.remove(message); + //inefficient + _messages.remove(message); } } @@ -353,7 +361,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + - " subscribers to give the message to."); + " subscribers to give the message to. Queued count (" + getMessageCount() + ")"); } for (Subscription sub : _subscriptions.getSubscriptions()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 328aed81d9..5b6cab294c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -32,9 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -85,7 +83,13 @@ public class MemoryMessageStore implements MessageStore { _log.debug("Removing message with id " + messageId); } - _messageMap.remove(messageId); + Object o = _messageMap.remove(messageId); + + if (_log.isDebugEnabled()) + { + _log.debug("Removed message " + System.identityHashCode(o)); + } + } public void createQueue(AMQQueue queue) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8dce5d4494..e475270ecd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -812,15 +812,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { - try - { - suspendChannel(true); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } + suspendChannel(true); } + for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); @@ -839,17 +833,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _dispatcher.rollback(); } - + if (!isSuspended) { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } + suspendChannel(false); } } catch (AMQException e) diff --git a/qpid/java/distribution/pom.xml b/qpid/java/distribution/pom.xml index 8079f041c1..4bfc941cd0 100644 --- a/qpid/java/distribution/pom.xml +++ b/qpid/java/distribution/pom.xml @@ -38,7 +38,7 @@ <java.source.version>1.5</java.source.version> <qpid.version>${pom.version}</qpid.version> <qpid.targetDir>${project.build.directory}</qpid.targetDir> - <release.revision>-r506403</release.revision> + <release.revision>-r506413</release.revision> </properties> <repositories> |