summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-12 11:03:12 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-12 11:03:12 +0000
commit01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400 (patch)
tree9ee7bd19e52a62407197840f76f9ff83ebb9097b
parent83d11a1e4deb2bd5e76d893dbedfe97d4241b2ad (diff)
downloadqpid-python-01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400.tar.gz
Fixed Xmx value
ConcurrentSelectorDeliveryManager : Added trace logging. Ensured messages are removed when required, rather than leaking memory. AMQSession moved exceptions in recover to wrap method rather than individual suspend calls. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506413 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/AppliedPatches.txt1
-rw-r--r--qpid/java/broker/etc/qpid-server.conf2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java21
-rw-r--r--qpid/java/distribution/pom.xml2
6 files changed, 25 insertions, 25 deletions
diff --git a/qpid/java/AppliedPatches.txt b/qpid/java/AppliedPatches.txt
index afc7339731..ac53368036 100644
--- a/qpid/java/AppliedPatches.txt
+++ b/qpid/java/AppliedPatches.txt
@@ -1,5 +1,6 @@
-Notes-
Updated to resolve Rollback/Recover problems.
+Changed Xmx to 1024
-Latest Revision-
504112,503646,502576.499979
diff --git a/qpid/java/broker/etc/qpid-server.conf b/qpid/java/broker/etc/qpid-server.conf
index 6d31db7fa9..0e98f72a37 100644
--- a/qpid/java/broker/etc/qpid-server.conf
+++ b/qpid/java/broker/etc/qpid-server.conf
@@ -21,5 +21,5 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
export JAVA=java \
JAVA_VM=-server \
- JAVA_MEM="-Xmx3160m -Xms512m "\
+ JAVA_MEM="-Xmx1024m -Xms512m "\
CLASSPATH=$QPID_LIBS
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 1a26bab011..9b79657575 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -230,6 +230,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ ") from queue (" + System.identityHashCode(messageQueue) +
+ ") AMQQueue (" + System.identityHashCode(queue) + ")");
+ }
+
if (messageQueue == null)
{
// There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
@@ -276,7 +283,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
//fixme
_log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages");
- //_messages.remove(message);
+ //inefficient
+ _messages.remove(message);
}
}
@@ -353,7 +361,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
_log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
- " subscribers to give the message to.");
+ " subscribers to give the message to. Queued count (" + getMessageCount() + ")");
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 328aed81d9..5b6cab294c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -32,9 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-/**
- * A simple message store that stores the messages in a threadsafe structure in memory.
- */
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -85,7 +83,13 @@ public class MemoryMessageStore implements MessageStore
{
_log.debug("Removing message with id " + messageId);
}
- _messageMap.remove(messageId);
+ Object o = _messageMap.remove(messageId);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removed message " + System.identityHashCode(o));
+ }
+
}
public void createQueue(AMQQueue queue) throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8dce5d4494..e475270ecd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -812,15 +812,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (!isSuspended)
{
- try
- {
- suspendChannel(true);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
+ suspendChannel(true);
}
+
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
@@ -839,17 +833,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_dispatcher.rollback();
}
-
+
if (!isSuspended)
{
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
+ suspendChannel(false);
}
}
catch (AMQException e)
diff --git a/qpid/java/distribution/pom.xml b/qpid/java/distribution/pom.xml
index 8079f041c1..4bfc941cd0 100644
--- a/qpid/java/distribution/pom.xml
+++ b/qpid/java/distribution/pom.xml
@@ -38,7 +38,7 @@
<java.source.version>1.5</java.source.version>
<qpid.version>${pom.version}</qpid.version>
<qpid.targetDir>${project.build.directory}</qpid.targetDir>
- <release.revision>-r506403</release.revision>
+ <release.revision>-r506413</release.revision>
</properties>
<repositories>