summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-03-01 15:50:27 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-03-01 15:50:27 +0000
commit92be7e8f3163c048a8642d2deeaa921bbb65dc9c (patch)
tree5c0fa12fa0787d8870eab113469f74ccfb07a5be
parente78f6a9e59098ac104892a79b74c9895272b292e (diff)
downloadqpid-python-92be7e8f3163c048a8642d2deeaa921bbb65dc9c.tar.gz
NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295635 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java171
-rw-r--r--qpid/java/broker/src/main/java/log4j.properties24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java93
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java1
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java72
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java121
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java72
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java329
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java5
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java7
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java19
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java43
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java5
-rw-r--r--qpid/java/client/test/example_build.xml2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java87
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java9
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java3
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java5
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java17
-rw-r--r--qpid/java/module.xml4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java121
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java324
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java272
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java57
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java113
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java98
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java11
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes6
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes4
71 files changed, 1754 insertions, 750 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 2e2d2f0b11..8884a99923 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -30,12 +30,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.*;
@@ -94,6 +96,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+
static final int DATABASE_FORMAT_VERSION = 5;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -893,103 +897,133 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
-
+ boolean complete = false;
com.sleepycat.je.Transaction tx = null;
- Cursor cursor = null;
+ Random rand = null;
+ int attempts = 0;
try
{
- tx = _environment.beginTransaction(null, null);
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
-
- if (_log.isDebugEnabled())
+ do
{
- _log.debug("Removing message id " + messageId);
- }
+ tx = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removing message id " + messageId);
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted metadata for message " + messageId);
- }
- //now remove the content data from the store if there is any.
+ OperationStatus status = _messageMetaDataDb.delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
+ }
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted metadata for message " + messageId);
+ }
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ //now remove the content data from the store if there is any.
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
- cursor = _messageContentDb.openCursor(tx, null);
- status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
- {
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ int offset = 0;
+ do
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 4, true);
+
+ status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED);
+
+ if(status == OperationStatus.SUCCESS)
+ {
- if(mck.getMessageId() != messageId)
- {
- //we have exhausted all chunks for this message id, break
- break;
+ offset += IntegerBinding.entryToInt(value);
+ _messageContentDb.delete(tx, contentKeyEntry);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ }
+ }
+ }
+ while (status == OperationStatus.SUCCESS);
+
+ commit(tx, sync);
+ complete = true;
+ tx = null;
}
- else
+ catch (LockConflictException e)
{
- status = cursor.delete();
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ _log.warn("Unable to abort transaction after LockConflictExcption", e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw e;
+ }
+
- if(status == OperationStatus.NOTFOUND)
+ _log.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
{
- cursor.close();
- cursor = null;
+ if(rand == null)
+ {
+ rand = new Random();
+ }
- tx.abort();
- throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
- }
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
- if (_log.isDebugEnabled())
+ }
+ }
+ else
{
- _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw e;
}
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
-
- cursor.close();
- cursor = null;
-
- commit(tx, sync);
+ while(!complete);
}
catch (DatabaseException e)
{
- e.printStackTrace();
+ _log.error("Unexpected BDB exception", e);
if (tx != null)
{
try
{
- if(cursor != null)
- {
- cursor.close();
- cursor = null;
- }
-
tx.abort();
+ tx = null;
}
catch (DatabaseException e1)
{
@@ -1001,15 +1035,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
finally
{
- if(cursor != null)
+ if (tx != null)
{
try
{
- cursor.close();
+ tx.abort();
+ tx = null;
}
- catch (DatabaseException e)
+ catch (DatabaseException e1)
{
- throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
+ throw new AMQStoreException("Error aborting transaction " + e1, e1);
}
}
}
@@ -2073,7 +2108,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
// RHM-7 Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
- _lock.wait(250);
+ _lock.wait(1000);
}
catch (InterruptedException e)
{
diff --git a/qpid/java/broker/src/main/java/log4j.properties b/qpid/java/broker/src/main/java/log4j.properties
deleted file mode 100644
index 6788c65463..0000000000
--- a/qpid/java/broker/src/main/java/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootCategory=${amqj.logging.level}, console
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 873c846258..c0ecbb3630 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -74,27 +74,20 @@ import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class AMQChannel implements SessionConfig, AMQSessionModel
+public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -133,6 +126,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final MessageStore _messageStore;
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
+
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
// Set of messages being acknoweledged in the current transaction
@@ -185,7 +182,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_messageStore = messageStore;
// by default the session is non-transactional
- _transaction = new AutoCommitTransaction(_messageStore);
+ _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
_clientDeliveryMethod = session.createDeliveryMethod(_channelId);
}
@@ -204,14 +201,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
public boolean isTransactional()
{
- // this does not look great but there should only be one "non-transactional"
- // transactional context, while there could be several transactional ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public void receivedComplete()
{
+ sync();
}
@@ -267,7 +262,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
{
- if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), info.getRoutingKey().asString(), e.getName()))
+ String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
+ if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName()))
{
throw new AMQSecurityException("Permission denied: " + e.getName());
}
@@ -1562,4 +1558,69 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
}
+
+ public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void sync()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index 5c1814590c..e3d8747d72 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -417,12 +417,12 @@ public class Broker
else
{
System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
- System.err.println("Using the fallback internal log4j.properties configuration");
+ System.err.println("Using the fallback internal fallback-log4j.properties configuration");
- InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
+ InputStream propsFile = this.getClass().getResourceAsStream("/fallback-log4j.properties");
if(propsFile == null)
{
- throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
+ throw new IOException("Unable to load the fallback internal fallback-log4j.properties configuration file");
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
index d587ef0c16..1b0168df56 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,6 +53,11 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -71,7 +77,7 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
}
-
+ channel.sync();
session.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 29054f55c1..bc2a2dca04 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -68,6 +68,7 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
{
MethodRegistry methodRegistry = session.getMethodRegistry();
BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ channel.sync();
session.writeFrame(cancelOkBody.generateFrame(channelId));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 8875f21d0b..a1cfb14753 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -60,6 +60,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+ channel.sync();
if (_logger.isDebugEnabled())
{
_logger.debug("BasicConsume: from '" + body.getQueue() +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index bbb009003c..2073299467 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -75,6 +75,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
+ channel.sync();
AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
if (queue == null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index dd3281c65f..2cf043dd26 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -46,7 +46,7 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index c7842cd643..429217321c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
{
MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody();
+ channel.sync();
session.writeFrame(recoverOk.generateFrame(channelId));
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
index f9feada6fe..1e2a83b922 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
@@ -59,7 +59,7 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.resend(body.getRequeue());
// Qpid 0-8 hacks a synchronous -ok onto recover.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 32aa99534b..ecffd1b9cb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -65,6 +65,7 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
}
+ channel.sync();
session.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
stateManager.getProtocolSession().closeChannelOk(channelId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index 5ccaa49de8..365c8bd9c6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -55,6 +55,7 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
{
throw body.getChannelNotFoundException(channelId);
}
+ channel.sync();
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 21aea1510b..53835f381f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -69,7 +70,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
-
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.sync();
AMQShortString exchangeName = body.getExchange();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 98a0d33487..69cf0c9e20 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -55,6 +56,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
if (_logger.isDebugEnabled())
{
@@ -102,6 +108,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
MethodRegistry methodRegistry = session.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ channel.sync();
session.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 586aaf9336..339085691f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,7 +50,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
-
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.sync();
try
{
if(exchangeRegistry.getExchange(body.getExchange()) == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 0eb69e4b16..bb979d5441 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -64,18 +64,18 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
final AMQQueue queue;
final AMQShortString routingKey;
if (body.getQueue() == null)
{
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
queue = channel.getDefaultQueue();
@@ -150,6 +150,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
if (!body.getNowait())
{
+ channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 693b316607..32cd1c2d08 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -197,6 +197,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getNowait())
{
+ channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 902e3ade85..107e485275 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -71,7 +71,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
AMQQueue queue;
if (body.getQueue() == null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 6c3e11be5b..7d609f9064 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -109,7 +109,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if(!body.getNowait())
{
-
+ channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
index 3849c5af19..9915627a94 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -132,6 +132,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
// 0-8 does not support QueueUnbind
throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null);
}
+ channel.sync();
session.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
index 483bca894e..fa06a99204 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.output;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.*;
import org.apache.qpid.AMQPInvalidClassException;
+import java.util.HashMap;
import java.util.Map;
public class HeaderPropertiesConverter
{
- public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage)
+ public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost)
{
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
@@ -82,11 +83,23 @@ public class HeaderPropertiesConverter
}
if(messageProps.hasMessageId())
{
- props.setMessageId(messageProps.getMessageId().toString());
+ props.setMessageId("ID:" + messageProps.getMessageId().toString());
}
+ if(messageProps.hasReplyTo())
+ {
+ ReplyTo replyTo = messageProps.getReplyTo();
+ String exchangeName = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+ if(exchangeName == null)
+ {
+ exchangeName = "";
+ }
- // TODO Reply-to
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
+ props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
+ }
if(messageProps.hasUserId())
{
props.setUserId(new AMQShortString(messageProps.getUserId()));
@@ -94,7 +107,12 @@ public class HeaderPropertiesConverter
if(messageProps.hasApplicationHeaders())
{
- Map<String, Object> appHeaders = messageProps.getApplicationHeaders();
+ Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
+ if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
+ {
+ props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
+ }
+
FieldTable ft = new FieldTable();
for(Map.Entry<String, Object> entry : appHeaders.entrySet())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index efd904f6aa..1e62e5e9ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -91,7 +91,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
else
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
chb.bodySize = message.getSize();
return chb;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 010afcb1a9..78507b0cf2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -86,7 +86,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
else
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
chb.bodySize = message.getSize();
return chb;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
index 5e2b3e4556..9102b6c651 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
@@ -85,7 +85,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
else
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
chb.bodySize = message.getSize();
return chb;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 241ab3fcb9..547f2440db 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -1069,7 +1069,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
try
{
- closeSession();
+ try
+ {
+ closeSession();
+ }
+ finally
+ {
+ closeProtocolSession();
+ }
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 4faca15eb0..dfad9157c5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -275,10 +275,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
{
- String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP));
+ Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP);
_messageGroupManager =
new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
- defaultGroup == null ? QPID_NO_GROUP : defaultGroup,
+ defaultGroup == null ? QPID_NO_GROUP : defaultGroup.toString(),
this);
}
else
@@ -2255,9 +2255,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public boolean equals(Object o)
{
- assert o != null;
- assert o instanceof QueueEntryListener;
- return _sub == ((QueueEntryListener) o)._sub;
+ return o != null
+ && o instanceof SimpleAMQQueue.QueueEntryListener
+ && _sub == ((QueueEntryListener) o)._sub;
}
public int hashCode()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 3f02442704..865b3d1f48 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
import java.util.Map;
@@ -7,7 +26,11 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class SortedQueue extends OutOfOrderQueue
{
- private String _sortedPropertyName;
+ //Lock object to synchronize enqueue. Used instead of the object
+ //monitor to prevent lock order issues with subscription sendLocks
+ //and consumer updates in the super classes
+ private final Object _sortedQueueLock = new Object();
+ private final String _sortedPropertyName;
protected SortedQueue(final String name, final boolean durable,
final String owner, final boolean autoDelete, final boolean exclusive,
@@ -23,8 +46,11 @@ public class SortedQueue extends OutOfOrderQueue
return _sortedPropertyName;
}
- public synchronized void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
- super.enqueue(message, action);
+ synchronized (_sortedQueueLock)
+ {
+ super.enqueue(message, action);
+ }
}
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
index 7a70795e77..87c79178f0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
public class SortedQueueEntryListFactory implements QueueEntryListFactory
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index 42818db214..689e48b4cf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -172,7 +172,6 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
- _logger.debug("Earliest available entry for " + sub + " is " + visitor.getEntry() + (visitor.getEntry() == null ? "" : " : " + getKey(visitor.getEntry())));
return visitor.getEntry();
}
@@ -250,12 +249,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
if(newState == QueueEntry.State.ACQUIRED)
{
- _logger.debug("Adding to " + _group);
_group.add();
}
else if(oldState == QueueEntry.State.ACQUIRED)
{
- _logger.debug("Subtracting from " + _group);
_group.subtract();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 1e36119ce8..bde756dd03 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.url.AMQBindingURL;
+import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -85,7 +77,6 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
-
private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
@@ -450,7 +441,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
}
- // TODO - ReplyTo
+ if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ {
+ String origReplyToString = properties.getReplyTo().asString();
+ ReplyTo replyTo = new ReplyTo();
+ // if the string looks like a binding URL, then attempt to parse it...
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+ AMQShortString routingKey = burl.getRoutingKey();
+ if(routingKey != null)
+ {
+ replyTo.setRoutingKey(routingKey.asString());
+ }
+
+ AMQShortString exchangeName = burl.getExchangeName();
+ if(exchangeName != null)
+ {
+ replyTo.setExchange(exchangeName.asString());
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ replyTo.setRoutingKey(origReplyToString);
+ }
+ messageProps.setReplyTo(replyTo);
+
+ }
+
+ if(properties.getMessageId() != null)
+ {
+ try
+ {
+ String messageIdAsString = properties.getMessageIdAsString();
+ if(messageIdAsString.startsWith("ID:"))
+ {
+ messageIdAsString = messageIdAsString.substring(3);
+ }
+ UUID uuid = UUID.fromString(messageIdAsString);
+ messageProps.setMessageId(uuid);
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore - can't parse
+ }
+ }
+
+
if(properties.getUserId() != null)
{
@@ -459,7 +496,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
FieldTable fieldTable = properties.getHeaders();
- final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+ Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+ if(properties.getType() != null)
+ {
+ appHeaders.put("x-jms-type", properties.getTypeAsString());
+ }
messageProps.setApplicationHeaders(appHeaders);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index ab07ed20f6..04cdbf2b25 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -62,6 +62,8 @@ import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ServerConnection extends Connection implements Managable, AMQConnectionModel, LogSubject, AuthorizationHolder
{
@@ -123,6 +125,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
{
_virtualHost.getConnectionRegistry().deregisterConnection(this);
}
+ unregisterConnectionMbean();
}
if (state == State.CLOSED)
@@ -161,15 +164,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
initialiseStatistics();
- try
- {
- _mBean = new ServerConnectionMBean(this);
- _mBean.register();
- }
- catch (JMException jme)
- {
- log.error("Unable to create mBean for ServerConnection",jme);
- }
+ registerConnectionMbean();
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -285,11 +280,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
public void close(AMQConstant cause, String message) throws AMQException
{
closeSubscriptions();
- if (_mBean != null)
- {
- _mBean.unregister();
- _mBean = null;
- }
+ unregisterConnectionMbean();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -433,11 +424,6 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
public void closed()
{
closeSubscriptions();
- if (_mBean != null)
- {
- _mBean.unregister();
- _mBean = null;
- }
super.closed();
}
@@ -483,4 +469,30 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
_mBean.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
}
}
+
+ private void registerConnectionMbean()
+ {
+ try
+ {
+ _mBean = new ServerConnectionMBean(this);
+ _mBean.register();
+ }
+ catch (JMException jme)
+ {
+ log.error("Unable to register mBean for ServerConnection", jme);
+ }
+ }
+
+ private void unregisterConnectionMbean()
+ {
+ if (_mBean != null)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Unregistering mBean for ServerConnection" + _mBean);
+ }
+ _mBean.unregister();
+ _mBean = null;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 2142b2f7c3..62a1e2b0f5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -27,6 +27,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
@@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -84,18 +85,20 @@ import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
+ AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
@@ -147,7 +150,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
- _transaction = new AutoCommitTransaction(this.getMessageStore());
+ _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
@@ -184,16 +187,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction;
- if(isTransactional())
- {
- postTransactionAction = new PostEnqueueAction(queues, message) ;
- }
- else
- {
- postTransactionAction = _postEnqueueAction;
- postTransactionAction.setState(queues, message);
- }
+ PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
_transaction.enqueue(queues,message, postTransactionAction, 0L);
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
@@ -221,12 +215,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void accept(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
- {
- public void performAction(MessageDispositionChangeListener listener)
- {
- listener.onAccept();
- }
- });
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onAccept();
+ }
+ });
}
@@ -444,10 +438,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public boolean isTransactional()
{
- // this does not look great but there should only be one "non-transactional"
- // transactional context, while there could be several transactional ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public boolean inTransaction()
@@ -765,6 +756,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
subscription_0_10.flushCreditState(false);
}
+ awaitCommandCompletion();
}
private class PostEnqueueAction implements ServerTransaction.Action
@@ -774,17 +766,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private ServerMessage _message;
private final boolean _transactional;
- public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+ public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
{
- _transactional = true;
+ _transactional = transactional;
setState(queues, message);
}
- public PostEnqueueAction()
- {
- _transactional = false;
- }
-
public void setState(List<? extends BaseQueue> queues, ServerMessage message)
{
_message = message;
@@ -830,4 +817,76 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
return _blocking.get();
}
+
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void awaitCommandCompletion()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public Object getAsyncCommandMark()
+ {
+ return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
+ }
+
+ public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index b6e142a5fd..2eab65cf8a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.transport;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -81,9 +81,22 @@ public class ServerSessionDelegate extends SessionDelegate
if(!session.isClosing())
{
- super.command(session, method);
+ Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+ super.command(session, method, false);
+ Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+ if(newOutstanding == null || newOutstanding == asyncCommandMark)
+ {
+ session.processed(method);
+ }
+
+ if(newOutstanding != null)
+ {
+ ((ServerSession)session).completeAsyncCommands();
+ }
+
if (method.isSync())
{
+ ((ServerSession)session).awaitCommandCompletion();
session.flushProcessed();
}
}
@@ -98,7 +111,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageAccept(Session session, MessageAccept method)
{
- ((ServerSession)session).accept(method.getTransfers());
+ final ServerSession serverSession = (ServerSession) session;
+ serverSession.accept(method.getTransfers());
+ if(!serverSession.isTransactional())
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, method));
+ }
}
@Override
@@ -252,7 +271,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
@Override
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
final Exchange exchange = getExchangeForMessage(ssn, xfr);
@@ -294,12 +313,13 @@ public class ServerSessionDelegate extends SessionDelegate
exchangeInUse = exchange;
}
+ final ServerSession serverSession = (ServerSession) ssn;
if(!queues.isEmpty())
{
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
- ((ServerSession) ssn).enqueue(message, queues);
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+ serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -313,13 +333,19 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
}
}
-
- ssn.processed(xfr);
+ if(serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
+ }
+ else
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
+ }
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
@@ -404,6 +430,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
+ public void executionSync(final Session ssn, final ExecutionSync sync)
+ {
+ ((ServerSession)ssn).awaitCommandCompletion();
+ super.executionSync(ssn, sync);
+ }
+
+ @Override
public void exchangeDeclare(Session session, ExchangeDeclare method)
{
String exchangeName = method.getExchange();
@@ -1269,4 +1302,25 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerConnection scon = (ServerConnection) session.getConnection();
SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
}
+
+ private static class CommandProcessedAction implements ServerTransaction.Action
+ {
+ private final ServerSession _serverSession;
+ private final Method _method;
+
+ public CommandProcessedAction(final ServerSession serverSession, final Method xfr)
+ {
+ _serverSession = serverSession;
+ _method = xfr;
+ }
+
+ public void postCommit()
+ {
+ _serverSession.processed(_method);
+ }
+
+ public void onRollback()
+ {
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
new file mode 100755
index 0000000000..7e238aeadc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -0,0 +1,329 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * An implementation of ServerTransaction where each enqueue/dequeue
+ * operation takes place within it own transaction.
+ *
+ * Since there is no long-lived transaction, the commit and rollback methods of
+ * this implementation are empty.
+ */
+public class AsyncAutoCommitTransaction implements ServerTransaction
+{
+ protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
+
+ private final MessageStore _messageStore;
+ private final FutureRecorder _futureRecorder;
+
+ public static interface FutureRecorder
+ {
+ public void recordFuture(StoreFuture future, Action action);
+
+ }
+
+ public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder)
+ {
+ _messageStore = transactionLog;
+ _futureRecorder = recorder;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
+
+ /**
+ * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
+ * by the caller are executed immediately.
+ */
+ public void addPostTransactionAction(final Action immediateAction)
+ {
+ addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+
+ }
+
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ MessageStore.StoreFuture future;
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ txn = _messageStore.newTransaction();
+ txn.dequeueMessage(queue, message);
+ future = txn.commitTranAsync();
+
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message dequeue", e);
+ throw new RuntimeException("Error during message dequeue", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+ private void addFuture(final MessageStore.StoreFuture future, final Action action)
+ {
+ if(action != null)
+ {
+ if(future.isComplete())
+ {
+ action.postCommit();
+ }
+ else
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+ }
+ }
+
+ public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ for(QueueEntry entry : queueEntries)
+ {
+ ServerMessage message = entry.getMessage();
+ BaseQueue queue = entry.getQueue();
+
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ if(txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+
+ txn.dequeueMessage(queue, message);
+ }
+
+ }
+ MessageStore.StoreFuture future;
+ if(txn != null)
+ {
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message dequeues", e);
+ throw new RuntimeException("Error during message dequeues", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ MessageStore.StoreFuture future;
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ txn = _messageStore.newTransaction();
+ txn.enqueueMessage(queue, message);
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message enqueue", e);
+ throw new RuntimeException("Error during message enqueue", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+
+ }
+
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+
+ if(message.isPersistent())
+ {
+ for(BaseQueue queue : queues)
+ {
+ if (queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+
+ txn.enqueueMessage(queue, message);
+
+
+ }
+ }
+
+ }
+ MessageStore.StoreFuture future;
+ if (txn != null)
+ {
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message enqueues", e);
+ throw new RuntimeException("Error during message enqueues", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+ if(immediatePostTransactionAction != null)
+ {
+ addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ {
+ public void postCommit()
+ {
+ immediatePostTransactionAction.run();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ }
+
+ public void commit()
+ {
+ }
+
+ public void rollback()
+ {
+ }
+
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
+ private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+ {
+ if (txn != null)
+ {
+ try
+ {
+ txn.abortTran();
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Abort transaction failed", e);
+ // Deliberate decision not to re-throw this exception. Rationale: we can only reach here if a previous
+ // TransactionLog method has ended in Exception. If we were to re-throw here, we would prevent
+ // our caller from receiving the original exception (which is likely to be more revealing of the underlying error).
+ }
+ }
+ if (postTransactionAction != null)
+ {
+ postTransactionAction.onRollback();
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index a67d4badd1..ad2a299108 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -242,6 +242,11 @@ public class AutoCommitTransaction implements ServerTransaction
{
}
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
{
if (txn != null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 7f5b5fb8b2..34bac0411e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -309,7 +309,12 @@ public class LocalTransaction implements ServerTransaction
private void resetDetails()
{
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
}
+
+ public boolean isTransactional()
+ {
+ return true;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index acdf712de9..c568ae67aa 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -41,6 +41,8 @@ import org.apache.qpid.server.queue.QueueEntry;
*/
public interface ServerTransaction
{
+
+
/**
* Represents an action to be performed on transaction commit or rollback
*/
@@ -108,4 +110,6 @@ public interface ServerTransaction
* be executed immediately after the underlying transaction has rolled-back.
*/
void rollback();
+
+ boolean isTransactional();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
index 43fb5b4cb3..99e05851ca 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index 34ad0e5668..d177993886 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.AMQMessage;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index 999b22299c..8311aa80ce 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -40,7 +40,12 @@ import org.apache.qpid.url.BindingURL;
* to support any destination defined in AMQP 0-10 spec.
*/
public class AMQAnyDestination extends AMQDestination implements Queue, Topic
-{
+{
+ protected AMQAnyDestination()
+ {
+ super();
+ }
+
public AMQAnyDestination(BindingURL binding)
{
super(binding);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 802cc55b0e..0ded689ea6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -211,7 +211,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
+ " port: " + brokerDetail.getPort() + " vhost: "
+ _conn.getVirtualHost() + " username: "
+ _conn.getUsername() + " password: "
- + _conn.getPassword());
+ + "********");
}
ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 92602ac3a2..61fe722423 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -151,6 +151,10 @@ public abstract class AMQDestination implements Destination, Referenceable
return defaultDestSyntax;
}
+ protected AMQDestination()
+ {
+ }
+
protected AMQDestination(Address address) throws Exception
{
this._address = address;
@@ -186,6 +190,11 @@ public abstract class AMQDestination implements Destination, Referenceable
protected AMQDestination(String str) throws URISyntaxException
{
+ parseDestinationString(str);
+ }
+
+ protected void parseDestinationString(String str) throws URISyntaxException
+ {
_destSyntax = getDestType(str);
str = stripSyntaxPrefix(str);
if (_destSyntax == DestSyntax.BURL)
@@ -305,6 +314,16 @@ public abstract class AMQDestination implements Destination, Referenceable
}
}
+ public void setDestinationString(String str) throws Exception
+ {
+ parseDestinationString(str);
+ }
+
+ public String getDestinationString()
+ {
+ return toString();
+ }
+
public DestSyntax getDestSyntax()
{
return _destSyntax;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 5bd1bd629a..5ecb5d5913 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -30,6 +30,10 @@ import org.apache.qpid.url.BindingURL;
public class AMQQueue extends AMQDestination implements Queue
{
+ protected AMQQueue()
+ {
+ super();
+ }
public AMQQueue(String address) throws URISyntaxException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 784b75af10..48c4e3e3e6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
* to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
*/
- protected volatile boolean _usingDispatcherForCleanup;
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -3570,3 +3570,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 49b77dcc7b..8395c8f4b7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- synchronized (getMessageDeliveryLock())
- {
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
- Option.UNRELIABLE);
- sync();
- List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _prefetchedMessageTags.addAll(tags);
- }
- }
-
- _usingDispatcherForCleanup = true;
- syncDispatchQueue();
- _usingDispatcherForCleanup = false;
-
- RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
- RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
- RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
- + prefetched.size());
-
- for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
- {
- Range range = deliveredIter.next();
- all.add(range);
- }
-
- for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
- {
- Range range = prefetchedIter.next();
- all.add(range);
- }
-
- flushProcessed(all, false);
- getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
- getQpidSession().messageRelease(prefetched);
- sync();
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
+ }
}
else
{
@@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
}
}
+
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 780dbcafc2..5969d9a5a5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -43,6 +43,11 @@ public class AMQTopic extends AMQDestination implements Topic
super(address);
}
+ protected AMQTopic()
+ {
+ super();
+ }
+
/**
* Constructor for use in creating a topic using a BindingURL.
*
diff --git a/qpid/java/client/test/example_build.xml b/qpid/java/client/test/example_build.xml
index 329c12982c..dda3cb4263 100644
--- a/qpid/java/client/test/example_build.xml
+++ b/qpid/java/client/test/example_build.xml
@@ -73,7 +73,7 @@
</javac>
<copy todir="${example.classes}">
- <!-- copy any non java src files into the build tree, e.g. log4j.properties -->
+ <!-- copy any non java src files into the build tree, e.g. properties files -->
<fileset dir="${example.src}">
<exclude name="**/*.java"/>
<exclude name="**/package.html"/>
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java
index 2c7fe7b8ed..d9c12148cb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java
@@ -35,41 +35,42 @@ import static org.apache.qpid.messaging.util.PyPrint.pprint;
public class Address
{
+ private String _name;
+ private String _subject;
+ private Map _options;
+ private final String _myToString;
+
public static Address parse(String address)
{
return new AddressParser(address).parse();
}
- private String name;
- private String subject;
- private Map options;
-
public Address(String name, String subject, Map options)
{
- this.name = name;
- this.subject = subject;
- this.options = options;
+ this._name = name;
+ this._subject = subject;
+ this._options = options;
+ this._myToString = String.format("%s/%s; %s", pprint(_name), pprint(_subject), pprint(_options));
}
public String getName()
{
- return name;
+ return _name;
}
public String getSubject()
{
- return subject;
+ return _subject;
}
public Map getOptions()
{
- return options;
+ return _options;
}
public String toString()
{
- return String.format("%s/%s; %s", pprint(name), pprint(subject),
- pprint(options));
+ return _myToString;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 5a9ea73cae..3fc596d0eb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -33,7 +33,6 @@ import static org.apache.qpid.transport.Session.State.RESUMING;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
-import static org.apache.qpid.transport.util.Functions.mod;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import static org.apache.qpid.util.Serial.ge;
@@ -44,11 +43,9 @@ import static org.apache.qpid.util.Serial.max;
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class Session extends SessionInvoker
{
private static final Logger log = Logger.get(Session.class);
-
+
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
static class DefaultSessionListener implements SessionListener
@@ -113,7 +110,9 @@ public class Session extends SessionInvoker
// outgoing command count
private int commandsOut = 0;
- private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
+ private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
+ private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+ private final Object commandsLock = new Object();
private int commandBytes = 0;
private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
private int maxComplete = commandsOut - 1;
@@ -196,7 +195,7 @@ public class Session extends SessionInvoker
public void setAutoSync(boolean value)
{
- synchronized (commands)
+ synchronized (commandsLock)
{
this.autoSync = value;
}
@@ -204,10 +203,10 @@ public class Session extends SessionInvoker
protected void setState(State state)
{
- synchronized (commands)
+ synchronized (commandsLock)
{
this.state = state;
- commands.notifyAll();
+ commandsLock.notifyAll();
}
}
@@ -276,13 +275,13 @@ public class Session extends SessionInvoker
void resume()
{
_failoverRequired.set(false);
- synchronized (commands)
+ synchronized (commandsLock)
{
attach();
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
- Method m = commands[mod(i, commands.length)];
+ Method m = getCommand(i);
if (m == null)
{
m = new ExecutionSync();
@@ -337,11 +336,27 @@ public class Session extends SessionInvoker
}
}
+ private Method getCommand(int i)
+ {
+ return commands.get(i);
+ }
+
+ private void setCommand(int commandId, Method command)
+ {
+ commands.put(commandId, command);
+ }
+
+ private Method removeCommand(int id)
+ {
+ return commands.remove(id);
+ }
+
void dump()
{
- synchronized (commands)
+ synchronized (commandsLock)
{
- for (Method m : commands)
+ TreeMap<Integer, Method> ordered = new TreeMap<Integer, Method>(commands);
+ for (Method m : ordered.values())
{
if (m != null)
{
@@ -484,7 +499,7 @@ public class Session extends SessionInvoker
copy = processed.copy();
}
- synchronized (commands)
+ synchronized (commandsLock)
{
if (state == DETACHED || state == CLOSING || state == CLOSED)
{
@@ -539,18 +554,16 @@ public class Session extends SessionInvoker
{
log.debug("%s complete(%d, %d)", this, lower, upper);
}
- synchronized (commands)
+ synchronized (commandsLock)
{
int old = maxComplete;
for (int id = max(maxComplete, lower); le(id, upper); id++)
{
- int idx = mod(id, commands.length);
- Method m = commands[idx];
+ Method m = removeCommand(id);
if (m != null)
{
commandBytes -= m.getBodySize();
m.complete();
- commands[idx] = null;
}
}
if (le(lower, maxComplete + 1))
@@ -563,7 +576,7 @@ public class Session extends SessionInvoker
log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
}
- commands.notifyAll();
+ commandsLock.notifyAll();
return gt(maxComplete, old);
}
}
@@ -596,7 +609,7 @@ public class Session extends SessionInvoker
protected boolean isCommandsFull(int id)
{
- return id - maxComplete >= commands.length;
+ return id - maxComplete >= commandLimit;
}
public void invoke(Method m)
@@ -613,7 +626,7 @@ public class Session extends SessionInvoker
acquireCredit();
}
- synchronized (commands)
+ synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
{
@@ -629,7 +642,7 @@ public class Session extends SessionInvoker
Thread current = Thread.currentThread();
if (!current.equals(resumer) )
{
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && (state != OPEN && state != CLOSED))
{
checkFailoverRequired("Command was interrupted because of failover, before being sent");
@@ -678,7 +691,7 @@ public class Session extends SessionInvoker
if (isFull(next))
{
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && isFull(next) && state != CLOSED)
{
if (state == OPEN || state == RESUMING)
@@ -735,7 +748,7 @@ public class Session extends SessionInvoker
if ((replayTransfer) || m.hasCompletionListener())
{
- commands[mod(next, commands.length)] = m;
+ setCommand(next, m);
commandBytes += m.getBodySize();
}
if (autoSync)
@@ -817,7 +830,7 @@ public class Session extends SessionInvoker
public void sync(long timeout)
{
log.debug("%s sync()", this);
- synchronized (commands)
+ synchronized (commandsLock)
{
int point = commandsOut - 1;
@@ -826,19 +839,13 @@ public class Session extends SessionInvoker
executionSync(SYNC);
}
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
checkFailoverRequired("Session sync was interrupted by failover.");
if(log.isDebugEnabled())
{
- List<Method> waitingFor =
- Arrays.asList(commands)
- .subList(mod(maxComplete,commands.length),
- mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length)
- ? commands.length-1
- : mod(commandsOut-1, commands.length));
- log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor);
+ log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
}
w.await();
}
@@ -909,7 +916,7 @@ public class Session extends SessionInvoker
protected <T> Future<T> invoke(Method m, Class<T> klass)
{
- synchronized (commands)
+ synchronized (commandsLock)
{
int command = commandsOut;
ResultFuture<T> future = new ResultFuture<T>(klass);
@@ -1019,7 +1026,7 @@ public class Session extends SessionInvoker
{
log.debug("Closing [%s] in state [%s]", this, state);
}
- synchronized (commands)
+ synchronized (commandsLock)
{
switch(state)
{
@@ -1043,7 +1050,7 @@ public class Session extends SessionInvoker
protected void awaitClose()
{
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED)
{
checkFailoverRequired("close() was interrupted by failover.");
@@ -1063,7 +1070,7 @@ public class Session extends SessionInvoker
public void closed()
{
- synchronized (commands)
+ synchronized (commandsLock)
{
if (closing || getException() != null)
{
@@ -1074,7 +1081,7 @@ public class Session extends SessionInvoker
state = DETACHED;
}
- commands.notifyAll();
+ commandsLock.notifyAll();
synchronized (results)
{
@@ -1171,9 +1178,9 @@ public class Session extends SessionInvoker
//prevent them waiting for timeout for 60 seconds
//and possibly preventing failover proceeding
_failoverRequired.set(true);
- synchronized (commands)
+ synchronized (commandsLock)
{
- commands.notifyAll();
+ commandsLock.notifyAll();
}
synchronized (results)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 028b912ba1..cabff1cd13 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -45,10 +45,15 @@ public class SessionDelegate
method.dispatch(ssn, this);
}
- public void command(Session ssn, Method method) {
+ public void command(Session ssn, Method method)
+ {
+ command(ssn, method, !method.hasPayload());
+ }
+ public void command(Session ssn, Method method, boolean processed)
+ {
ssn.identify(method);
method.dispatch(ssn, this);
- if (!method.hasPayload())
+ if (processed)
{
ssn.processed(method);
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
index be129a67cc..a7b36bc98c 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
@@ -87,8 +87,9 @@ public class ConnectionFactoryProperties
{
if (_log.isTraceEnabled())
{
- _log.trace("setConnectionURL(" + connectionURL + ")");
+ _log.trace("setConnectionURL(" + Util.maskUrlForLog(connectionURL) + ")");
}
+
_hasBeenUpdated = true;
this._connectionURL = connectionURL;
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
index d56f520db4..363af1bbcd 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
@@ -425,11 +425,6 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
*/
public void setConnectionURL(final String connectionURL)
{
- if (_log.isTraceEnabled())
- {
- _log.trace("setConnectionURL(" + connectionURL + ")");
- }
-
_raProperties.setConnectionURL(connectionURL);
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java
index b927aaa0be..3957fa9660 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java
@@ -34,8 +34,10 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.transaction.TransactionManager;
+import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.ra.admin.QpidQueue;
import org.apache.qpid.ra.admin.QpidTopic;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,4 +183,19 @@ public class Util
{
return (object == null ? "null" : object.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(object))) ;
}
+
+
+ public static String maskUrlForLog(final String url)
+ {
+ String results = null;
+
+ try
+ {
+ results = new AMQConnectionURL(url).toString();
+ }
+ catch(Exception ignore){}
+
+ return (results == null) ? url : results;
+ }
+
}
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 349fb91e4a..a3b8606525 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -264,7 +264,7 @@
<classpath refid="module.class.path"/>
</javac>
- <!-- copy any non java src files into the build tree, e.g. log4j.properties -->
+ <!-- copy any non java src files into the build tree, e.g. properties files -->
<copy todir="${module.classes}" verbose="true">
<fileset dir="${module.src}">
<exclude name="**/*.java"/>
@@ -285,7 +285,7 @@
<classpath refid="module.test.path"/>
</javac>
- <!-- copy any non java src files into the build tree, e.g. log4j.properties -->
+ <!-- copy any non java src files into the build tree, e.g. properties files -->
<copy todir="${module.test.classes}" verbose="true">
<fileset dir="${module.test.src}">
<exclude name="**/*.java"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
index 5c5ad66777..d91b9b9263 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -1,24 +1,3 @@
-package org.apache.qpid.client.prefetch;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -39,6 +18,26 @@ import org.slf4j.LoggerFactory;
* under the License.
*
*/
+package org.apache.qpid.client.prefetch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
public class PrefetchBehaviourTest extends QpidBrokerTestCase
{
protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
@@ -132,44 +131,66 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
//wait for the other consumer to finish to ensure it completes ok
_logger.debug("waiting for async consumer to complete");
assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
- assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
+ assertFalse("Unexpected exception during async message processing",_exceptionCaught.get());
}
/**
- * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
- * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
- * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
- * Try to receive all 10 messages.
+ * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
+ *
*/
- public void testConnectionStop() throws Exception
+ public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
{
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
- Connection con = getConnection();
- con.start();
- Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
-
- MessageProducer prod = ssn.createProducer(queue);
- for (int i=0; i<10;i++)
+ Queue queue = getTestQueue();
+
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+
+ Connection connection = getConnection();
+ connection.start();
+ // Create Consumer A
+ Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(queue);
+
+ // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
+ final Message msg = consumerA.receiveNoWait();
+ assertNull(msg);
+
+ Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(producerSession, queue, 3);
+
+ // Create Consumer B
+ MessageConsumer consumerB = null;
+ if (isBroker010())
+ {
+ // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
+ consumerB = consSessA.createConsumer(queue);
+ }
+ else
{
- prod.send(ssn.createTextMessage("Msg" + i));
+ // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
+ Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumerB = consSessB.createConsumer(queue);
}
- MessageConsumer consumer = ssn.createConsumer(queue);
- // This is to ensure we get the first client to prefetch.
- Message msg = consumer.receive(1000);
- assertNotNull("The first consumer should get one message",msg);
- con.stop();
-
- Connection con2 = getConnection();
- con2.start();
- Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = ssn2.createConsumer(queue);
- for (int i=0; i<9;i++)
+ // As message delivery to consumer A is already started, the first two messages should
+ // now be with consumer A. The last message will still be on the Broker as consumer A's
+ // credit is exhausted and message delivery for consumer B is not yet running.
+
+ // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
+ // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
+ // and the third message could be delivered to either Consumer A or Consumer B.
+
+ // Check that consumer B gets the last (third) message.
+ final Message msgConsumerB = consumerB.receive(1500);
+ assertNotNull("Consumer B should have received a message", msgConsumerB);
+ assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));
+
+ // Now check that consumer A has indeed got the first two messages.
+ for (int i = 0; i < 2; i++)
{
- TextMessage m = (TextMessage)consumer2.receive(1000);
- assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
+ final Message msgConsumerA = consumerA.receive(1500);
+ assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
+ assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
}
}
-
}
+
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
index 5f758061d5..a258e4267d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
@@ -37,9 +38,13 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnectionMBeanTest.class);
+
/**
* JMX helper.
*/
@@ -120,9 +125,10 @@ public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
_connection.close();
+ LOGGER.debug("Querying JMX for number of open connections");
connections = _jmxUtils.getManagedConnections("test");
assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 0, connections.size());
+ assertEquals("Unexpected number of connection mbeans after connection closed", 0, connections.size());
}
public void testCommit() throws Exception
@@ -218,13 +224,13 @@ public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
mBean.rollbackTransactions(channelId.intValue());
Message m = consumer.receive(1000l);
- assertNull("Unexpected message received", m);
+ assertNull("Unexpected message received: " + String.valueOf(m), m);
producerSession.commit();
_connection.start();
m = consumer.receive(1000l);
- assertNull("Unexpected message received", m);
+ assertNull("Unexpected message received after commit " + String.valueOf(m), m);
}
public void testAuthorisedId() throws Exception
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
index ec96f778f6..07faf1ef3e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
@@ -31,6 +31,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
@@ -195,7 +196,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging
ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true);
- AMQFrame exchangeDeclare = body.generateFrame(0);
+ AMQFrame exchangeDeclare = body.generateFrame(((AMQSession)_session).getChannelId());
((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeleteOkBody.class);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
new file mode 100644
index 0000000000..a179b96768
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
@@ -0,0 +1,324 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.AMQBindingURL;
+
+import javax.jms.*;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class MessageProtocolConversionTest extends QpidBrokerTestCase
+{
+
+ private static final int TIMEOUT = 1500;
+ private Connection _connection_0_9_1;
+ private Connection _connection_0_10;
+
+ private static final boolean BOOLEAN_TEST_VAL = true;
+ private static final byte BYTE_TEST_VAL = (byte) 4;
+ private static final byte[] BYTES_TEST_VAL = {5, 4, 3, 2, 1};
+ private static final char CHAR_TEST_VAL = 'x';
+ private static final double DOUBLE_TEST_VAL = Double.MAX_VALUE;
+ private static final float FLOAT_TEST_VAL = Float.MAX_VALUE;
+ private static final int INT_TEST_VAL = -73;
+ private static final long LONG_TEST_VAL = Long.MIN_VALUE / 2l;
+ private static final short SHORT_TEST_VAL = -586;
+ private static final String STRING_TEST_VAL = "This is a test text message";
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ _connection_0_10 = getConnection();
+ setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1");
+ _connection_0_9_1 = getConnection();
+ }
+
+ public void test0_9_1_to_0_10_conversion() throws JMSException, AMQException
+ {
+ doConversionTests(_connection_0_9_1, _connection_0_10);
+ }
+
+ public void test_0_10_to_0_9_1_conversion() throws JMSException, AMQException
+ {
+
+ doConversionTests(_connection_0_10, _connection_0_9_1);
+ }
+
+ private void doConversionTests(Connection producerConn, Connection consumerConn) throws JMSException, AMQException
+ {
+ Session producerSession = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = getTestQueue();
+
+ MessageProducer producer = producerSession.createProducer(queue);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ consumerConn.start();
+ producerConn.start();
+
+ // Text Message
+
+ Message m = producerSession.createTextMessage(STRING_TEST_VAL);
+ producer.send(m);
+ m = consumer.receive(TIMEOUT);
+
+ assertNotNull("Expected text message did not arrive", m);
+ assertTrue("Received message not an instance of TextMessage (" + m.getClass().getName() + " instead)", m instanceof TextMessage);
+ assertEquals("Message text not as expected", STRING_TEST_VAL, ((TextMessage) m).getText());
+
+ // Map Message
+
+ MapMessage mapMessage = producerSession.createMapMessage();
+ mapMessage.setBoolean("boolean", BOOLEAN_TEST_VAL);
+ mapMessage.setByte("byte", BYTE_TEST_VAL);
+ mapMessage.setBytes("bytes", BYTES_TEST_VAL);
+ mapMessage.setChar("char", CHAR_TEST_VAL);
+ mapMessage.setDouble("double", DOUBLE_TEST_VAL);
+ mapMessage.setFloat("float", FLOAT_TEST_VAL);
+ mapMessage.setInt("int", INT_TEST_VAL);
+ mapMessage.setLong("long", LONG_TEST_VAL);
+ mapMessage.setShort("short", SHORT_TEST_VAL);
+ mapMessage.setString("string", STRING_TEST_VAL);
+
+ producer.send(mapMessage);
+
+ m = consumer.receive(TIMEOUT);
+
+ assertNotNull("Expected map message message did not arrive", m);
+ assertTrue("Received message not an instance of MapMessage (" + m.getClass().getName() + " instead)", m instanceof MapMessage);
+ MapMessage receivedMapMessage = (MapMessage) m;
+ assertEquals("Map message boolean value not as expected", BOOLEAN_TEST_VAL, receivedMapMessage.getBoolean("boolean"));
+ assertEquals("Map message byte value not as expected", BYTE_TEST_VAL, receivedMapMessage.getByte("byte"));
+ assertTrue("Map message bytes value not as expected", Arrays.equals(BYTES_TEST_VAL, receivedMapMessage.getBytes("bytes")));
+ assertEquals("Map message char value not as expected", CHAR_TEST_VAL, receivedMapMessage.getChar("char"));
+ assertEquals("Map message double value not as expected", DOUBLE_TEST_VAL, receivedMapMessage.getDouble("double"));
+ assertEquals("Map message float value not as expected", FLOAT_TEST_VAL, receivedMapMessage.getFloat("float"));
+ assertEquals("Map message int value not as expected", INT_TEST_VAL, receivedMapMessage.getInt("int"));
+ assertEquals("Map message long value not as expected", LONG_TEST_VAL, receivedMapMessage.getLong("long"));
+ assertEquals("Map message short value not as expected", SHORT_TEST_VAL, receivedMapMessage.getShort("short"));
+ assertEquals("Map message string value not as expected", STRING_TEST_VAL, receivedMapMessage.getString("string"));
+ ArrayList expectedNames = Collections.list(mapMessage.getMapNames());
+ Collections.sort(expectedNames);
+ ArrayList actualNames = Collections.list(receivedMapMessage.getMapNames());
+ Collections.sort(actualNames);
+ assertEquals("Map message keys not as expected", expectedNames, actualNames);
+
+ // Stream Message
+
+ StreamMessage streamMessage = producerSession.createStreamMessage();
+ streamMessage.writeString(STRING_TEST_VAL);
+ streamMessage.writeShort(SHORT_TEST_VAL);
+ streamMessage.writeLong(LONG_TEST_VAL);
+ streamMessage.writeInt(INT_TEST_VAL);
+ streamMessage.writeFloat(FLOAT_TEST_VAL);
+ streamMessage.writeDouble(DOUBLE_TEST_VAL);
+ streamMessage.writeChar(CHAR_TEST_VAL);
+ streamMessage.writeBytes(BYTES_TEST_VAL);
+ streamMessage.writeByte(BYTE_TEST_VAL);
+ streamMessage.writeBoolean(BOOLEAN_TEST_VAL);
+
+ producer.send(streamMessage);
+
+ m = consumer.receive(TIMEOUT);
+
+ assertNotNull("Expected stream message message did not arrive", m);
+ assertTrue("Received message not an instance of StreamMessage (" + m.getClass().getName() + " instead)", m instanceof StreamMessage);
+ StreamMessage receivedStreamMessage = (StreamMessage) m;
+
+ assertEquals("Stream message read string not as expected", STRING_TEST_VAL, receivedStreamMessage.readString());
+ assertEquals("Stream message read short not as expected", SHORT_TEST_VAL, receivedStreamMessage.readShort());
+ assertEquals("Stream message read long not as expected", LONG_TEST_VAL, receivedStreamMessage.readLong());
+ assertEquals("Stream message read int not as expected", INT_TEST_VAL, receivedStreamMessage.readInt());
+ assertEquals("Stream message read float not as expected", FLOAT_TEST_VAL, receivedStreamMessage.readFloat());
+ assertEquals("Stream message read double not as expected", DOUBLE_TEST_VAL, receivedStreamMessage.readDouble());
+ assertEquals("Stream message read char not as expected", CHAR_TEST_VAL, receivedStreamMessage.readChar());
+ byte[] bytesVal = new byte[BYTES_TEST_VAL.length];
+ receivedStreamMessage.readBytes(bytesVal);
+ assertTrue("Stream message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal));
+ assertEquals("Stream message read byte not as expected", BYTE_TEST_VAL, receivedStreamMessage.readByte());
+ assertEquals("Stream message read boolean not as expected", BOOLEAN_TEST_VAL, receivedStreamMessage.readBoolean());
+
+ try
+ {
+ receivedStreamMessage.readByte();
+ fail("Unexpected remaining bytes in stream message");
+ }
+ catch(MessageEOFException e)
+ {
+ // pass
+ }
+
+ // Object Message
+
+ ObjectMessage objectMessage = producerSession.createObjectMessage();
+ objectMessage.setObject(STRING_TEST_VAL);
+
+ producer.send(objectMessage);
+
+ m = consumer.receive(TIMEOUT);
+
+ assertNotNull("Expected object message message did not arrive", m);
+ assertTrue("Received message not an instance of ObjectMessage (" + m.getClass().getName() + " instead)", m instanceof ObjectMessage);
+ ObjectMessage receivedObjectMessage = (ObjectMessage) m;
+ assertEquals("Object message value not as expected", STRING_TEST_VAL, receivedObjectMessage.getObject());
+
+
+ // Bytes Message
+
+ BytesMessage bytesMessage = producerSession.createBytesMessage();
+ bytesMessage.writeBytes(BYTES_TEST_VAL);
+
+ producer.send(bytesMessage);
+
+ m = consumer.receive(TIMEOUT);
+
+ assertNotNull("Expected bytes message message did not arrive", m);
+ assertTrue("Received message not an instance of BytesMessage (" + m.getClass().getName() + " instead)", m instanceof BytesMessage);
+ BytesMessage receivedBytesMessage = (BytesMessage) m;
+ bytesVal = new byte[BYTES_TEST_VAL.length];
+ receivedBytesMessage.readBytes(bytesVal);
+ assertTrue("Bytes message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal));
+
+ try
+ {
+ receivedBytesMessage.readByte();
+ fail("Unexpected remaining bytes in stream message");
+ }
+ catch(MessageEOFException e)
+ {
+ // pass
+ }
+
+ // Headers / properties tests
+
+ Message msg = producerSession.createMessage();
+ msg.setJMSCorrelationID("testCorrelationId");
+ msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ msg.setJMSPriority(7);
+ msg.setJMSType("testType");
+
+ msg.setBooleanProperty("boolean", BOOLEAN_TEST_VAL);
+ msg.setByteProperty("byte", BYTE_TEST_VAL);
+ msg.setDoubleProperty("double", DOUBLE_TEST_VAL);
+ msg.setFloatProperty("float", FLOAT_TEST_VAL);
+ msg.setIntProperty("int", INT_TEST_VAL);
+ msg.setLongProperty("long", LONG_TEST_VAL);
+ msg.setShortProperty("short", SHORT_TEST_VAL);
+ msg.setStringProperty("string", STRING_TEST_VAL);
+
+ producer.send(msg);
+
+ m = consumer.receive(TIMEOUT);
+ assertNotNull("Expected message did not arrive", m);
+ assertEquals("JMSMessageID differs", msg.getJMSMessageID(), m.getJMSMessageID());
+ assertEquals("JMSCorrelationID differs",msg.getJMSCorrelationID(),m.getJMSCorrelationID());
+ assertEquals("JMSDeliveryMode differs",msg.getJMSDeliveryMode(),m.getJMSDeliveryMode());
+ assertEquals("JMSPriority differs",msg.getJMSPriority(),m.getJMSPriority());
+ assertEquals("JMSType differs",msg.getJMSType(),m.getJMSType());
+
+ assertEquals("Message boolean property not as expected", BOOLEAN_TEST_VAL, m.getBooleanProperty("boolean"));
+ assertEquals("Message byte property not as expected", BYTE_TEST_VAL, m.getByteProperty("byte"));
+ assertEquals("Message double property not as expected", DOUBLE_TEST_VAL, m.getDoubleProperty("double"));
+ assertEquals("Message float property not as expected", FLOAT_TEST_VAL, m.getFloatProperty("float"));
+ assertEquals("Message int property not as expected", INT_TEST_VAL, m.getIntProperty("int"));
+ assertEquals("Message long property not as expected", LONG_TEST_VAL, m.getLongProperty("long"));
+ assertEquals("Message short property not as expected", SHORT_TEST_VAL, m.getShortProperty("short"));
+ assertEquals("Message string property not as expected", STRING_TEST_VAL, m.getStringProperty("string"));
+
+ ArrayList<String> sentPropNames = Collections.list(msg.getPropertyNames());
+ Collections.sort(sentPropNames);
+ ArrayList<String> receivedPropNames = Collections.list(m.getPropertyNames());
+ Collections.sort(receivedPropNames);
+
+ // Shouldn't really need to do this, the client should be hiding these from us
+ removeSyntheticProperties(sentPropNames);
+ removeSyntheticProperties(receivedPropNames);
+
+ assertEquals("Property names were not as expected", sentPropNames, receivedPropNames);
+
+ // Test Reply To Queue
+
+ Destination replyToDestination = producerSession.createTemporaryQueue();
+ MessageConsumer replyToConsumer = producerSession.createConsumer(replyToDestination);
+ msg = producerSession.createMessage();
+ msg.setJMSReplyTo(replyToDestination);
+ producer.send(msg);
+
+ m = consumer.receive(TIMEOUT);
+ assertNotNull("Expected message did not arrive", m);
+ assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo());
+
+ MessageProducer responseProducer = consumerSession.createProducer(m.getJMSReplyTo());
+ responseProducer.send(consumerSession.createMessage());
+
+ assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT));
+
+ // Test Reply To Topic
+
+ replyToDestination = producerSession.createTemporaryTopic();
+ replyToConsumer = producerSession.createConsumer(replyToDestination);
+ msg = producerSession.createMessage();
+ msg.setJMSReplyTo(replyToDestination);
+ producer.send(msg);
+
+ m = consumer.receive(TIMEOUT);
+ assertNotNull("Expected message did not arrive", m);
+ assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo());
+
+ responseProducer = consumerSession.createProducer(m.getJMSReplyTo());
+ responseProducer.send(consumerSession.createMessage());
+
+ assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT));
+
+
+ }
+
+ private void removeSyntheticProperties(ArrayList<String> propNames)
+ {
+ Iterator<String> nameIter = propNames.iterator();
+ while(nameIter.hasNext())
+ {
+ String propName = nameIter.next();
+ if(propName.startsWith("x-jms") || propName.startsWith("JMS_QPID"))
+ {
+ nameIter.remove();
+ }
+ }
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _connection_0_9_1.close();
+ _connection_0_10.close();
+ super.tearDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
index 08a932eba1..11e1da18b3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -147,7 +147,6 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
assertNotNull("Consumer 2 should have received first message", cs2Received);
- cs1Received.acknowledge();
cs2Received.acknowledge();
Message cs2Received2 = consumer2.receive(1000);
@@ -156,6 +155,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
cs2Received.getStringProperty("group"));
+ cs1Received.acknowledge();
Message cs1Received2 = consumer1.receive(1000);
assertNotNull("Consumer 1 should have received second message", cs1Received2);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
index bf4dbcb19f..181fe9d34e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -20,10 +20,11 @@
package org.apache.qpid.server.queue;
import java.util.Arrays;
-import java.util.Calendar;
import java.util.HashMap;
-import java.util.Locale;
import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -128,11 +129,10 @@ public class SortedQueueTest extends QpidBrokerTestCase
final TestConsumerThread consumerThread = new TestConsumerThread(sessionMode, queue);
consumerThread.start();
- final Calendar cal = Calendar.getInstance(Locale.UK);
for(String value : VALUES)
{
- final Message msg = _producerSession.createTextMessage(String.valueOf(cal.getTimeInMillis()));
+ final Message msg = _producerSession.createMessage();
msg.setStringProperty(TEST_SORT_KEY, value);
producer.send(msg);
_producerSession.commit();
@@ -273,22 +273,22 @@ public class SortedQueueTest extends QpidBrokerTestCase
_consumerConnection.start();
//Receive 3 in sorted order
- received = receiveAndValidateMessage(consumer, "1");
+ received = assertReceiveAndValidateMessage(consumer, "1");
received.acknowledge();
- received = receiveAndValidateMessage(consumer, "2");
+ received = assertReceiveAndValidateMessage(consumer, "2");
received.acknowledge();
- received = receiveAndValidateMessage(consumer, "3");
+ received = assertReceiveAndValidateMessage(consumer, "3");
received.acknowledge();
//Send 1
sendAndCommitMessage(producer,"4");
//Receive 1 and recover
- received = receiveAndValidateMessage(consumer, "4");
+ received = assertReceiveAndValidateMessage(consumer, "4");
consumerSession.recover();
//Receive same 1
- received = receiveAndValidateMessage(consumer, "4");
+ received = assertReceiveAndValidateMessage(consumer, "4");
received.acknowledge();
//Send 3 out of order
@@ -296,33 +296,39 @@ public class SortedQueueTest extends QpidBrokerTestCase
sendAndCommitMessage(producer,"6");
sendAndCommitMessage(producer,"5");
- //Receive 1 of 3 (out of order due to pre-fetch) and recover
- received = receiveAndValidateMessage(consumer, "7");
+ //Receive 1 of 3 (possibly out of order due to pre-fetch)
+ final Message messageBeforeRollback = assertReceiveMessage(consumer);
consumerSession.recover();
if (isBroker010())
{
//Receive 3 in sorted order (not as per JMS recover)
- received = receiveAndValidateMessage(consumer, "5");
+ received = assertReceiveAndValidateMessage(consumer, "5");
received.acknowledge();
- received = receiveAndValidateMessage(consumer, "6");
+ received = assertReceiveAndValidateMessage(consumer, "6");
received.acknowledge();
- received = receiveAndValidateMessage(consumer, "7");
+ received = assertReceiveAndValidateMessage(consumer, "7");
received.acknowledge();
}
else
{
- //Receive 3 in partial sorted order due to recover
- received = receiveAndValidateMessage(consumer, "7");
+ //First message will be the one rolled-back (as per JMS spec).
+ final String messageKeyDeliveredBeforeRollback = messageBeforeRollback.getStringProperty(TEST_SORT_KEY);
+ received = assertReceiveAndValidateMessage(consumer, messageKeyDeliveredBeforeRollback);
received.acknowledge();
- received = receiveAndValidateMessage(consumer, "5");
+
+ //Remaining two messages will be sorted
+ final SortedSet<String> keys = new TreeSet<String>(Arrays.asList("5", "6", "7"));
+ keys.remove(messageKeyDeliveredBeforeRollback);
+
+ received = assertReceiveAndValidateMessage(consumer, keys.first());
received.acknowledge();
- received = receiveAndValidateMessage(consumer, "6");
+ received = assertReceiveAndValidateMessage(consumer, keys.last());
received.acknowledge();
}
}
- protected Queue createQueue() throws AMQException, JMSException
+ private Queue createQueue() throws AMQException, JMSException
{
final Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put(AMQQueueFactory.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY);
@@ -345,15 +351,22 @@ public class SortedQueueTest extends QpidBrokerTestCase
_producerSession.commit();
}
- private Message receiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException
+ private Message assertReceiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException
{
- final Message received = (TextMessage) consumer.receive(10000);
- assertNotNull("Received message is unexpectedly null", received);
+ final Message received = assertReceiveMessage(consumer);
assertEquals("Received message with unexpected sorted key value", expectedKey,
received.getStringProperty(TEST_SORT_KEY));
return received;
}
+ private Message assertReceiveMessage(final MessageConsumer consumer)
+ throws JMSException
+ {
+ final Message received = (TextMessage) consumer.receive(10000);
+ assertNotNull("Received message is unexpectedly null", received);
+ return received;
+ }
+
private class TestConsumerThread extends Thread
{
private boolean _stopped = false;
@@ -388,9 +401,8 @@ public class SortedQueueTest extends QpidBrokerTestCase
conn.start();
- TextMessage msg;
- Calendar cal = Calendar.getInstance(Locale.UK);
- while((msg = (TextMessage) consumer.receive(1000)) != null)
+ Message msg;
+ while((msg = consumer.receive(1000)) != null)
{
if(_sessionType == Session.SESSION_TRANSACTED)
{
@@ -427,9 +439,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
}
_count++;
- LOGGER.debug("Message consumed at : " + cal.getTimeInMillis());
LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY));
- LOGGER.debug("Message consumed with text: " + msg.getText());
LOGGER.debug("Message consumed with consumed index: " + _consumed);
}
@@ -439,8 +449,8 @@ public class SortedQueueTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- e.printStackTrace();
- }
+ LOGGER.error("Exception in listener", e);
+ }
}
public synchronized boolean isStopped()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
index 12039caf25..e6461c8267 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -70,6 +70,11 @@ public class AcknowledgeTest extends QpidBrokerTestCase
// These should all end up being prefetched by session
sendMessage(_consumerSession, _queue, 1);
+ if(!transacted)
+ {
+ ((AMQSession)_consumerSession).sync();
+ }
+
assertEquals("Wrong number of messages on queue", 1,
((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
deleted file mode 100644
index 474a425b28..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.client;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TopicSession;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQConnectionTest extends QpidBrokerTestCase
-{
- protected static AMQConnection _connection;
- protected static AMQTopic _topic;
- protected static AMQQueue _queue;
- private static QueueSession _queueSession;
- private static TopicSession _topicSession;
- protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class);
-
- protected void setUp() throws Exception
- {
- super.setUp();
- createConnection();
- _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
- _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- _connection.close();
- super.tearDown(); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- protected void createConnection() throws Exception
- {
- _connection = (AMQConnection) getConnection("guest", "guest");
- }
-
- /**
- * Simple tests to check we can create TopicSession and QueueSession ok
- * And that they throw exceptions where appropriate as per JMS spec
- */
-
- public void testCreateQueueSession() throws JMSException
- {
- createQueueSession();
- }
-
- private void createQueueSession() throws JMSException
- {
- _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE);
- }
-
- public void testCreateTopicSession() throws JMSException
- {
- createTopicSession();
- }
-
- private void createTopicSession() throws JMSException
- {
- _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
- }
-
- public void testTopicSessionCreateBrowser() throws JMSException
- {
- createTopicSession();
- try
- {
- _topicSession.createBrowser(_queue);
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testTopicSessionCreateQueue() throws JMSException
- {
- createTopicSession();
- try
- {
- _topicSession.createQueue("abc");
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testTopicSessionCreateTemporaryQueue() throws JMSException
- {
- createTopicSession();
- try
- {
- _topicSession.createTemporaryQueue();
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testQueueSessionCreateTemporaryTopic() throws JMSException
- {
- createQueueSession();
- try
- {
- _queueSession.createTemporaryTopic();
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testQueueSessionCreateTopic() throws JMSException
- {
- createQueueSession();
- try
- {
- _queueSession.createTopic("abc");
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testQueueSessionDurableSubscriber() throws JMSException
- {
- createQueueSession();
- try
- {
- _queueSession.createDurableSubscriber(_topic, "abc");
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testQueueSessionUnsubscribe() throws JMSException
- {
- createQueueSession();
- try
- {
- _queueSession.unsubscribe("abc");
- fail("expected exception did not occur");
- }
- catch (javax.jms.IllegalStateException s)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected javax.jms.IllegalStateException, got " + e);
- }
- }
-
- public void testPrefetchSystemProperty() throws Exception
- {
- _connection.close();
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
-
- createConnection();
- _connection.start();
- // Create two consumers on different sessions
- Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerA = consSessA.createConsumer(_queue);
-
- Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(_queue);
-
- // Send 3 messages
- for (int i = 0; i < 3; i++)
- {
- producer.send(producerSession.createTextMessage("test"));
- }
- producerSession.commit();
-
- MessageConsumer consumerB = null;
- // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
- if (!isBroker010())
- {
- Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- consumerB = consSessB.createConsumer(_queue);
- }
- else
- {
- consumerB = consSessA.createConsumer(_queue);
- }
-
- Message msg;
- // Check that consumer A has 2 messages
- for (int i = 0; i < 2; i++)
- {
- msg = consumerA.receive(1500);
- assertNotNull("Consumer A should receive 2 messages",msg);
- }
-
- msg = consumerA.receive(1500);
- assertNull("Consumer A should not have received a 3rd message",msg);
-
- // Check that consumer B has the last message
- msg = consumerB.receive(1500);
- assertNotNull("Consumer B should have received the message",msg);
- }
-
-
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java
deleted file mode 100644
index 5f3daa407a..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.client;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-
-public class AMQSSLConnectionTest extends AMQConnectionTest
-{
- private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks";
- private static final String KEYSTORE_PASSWORD = "password";
- private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks";
- private static final String TRUSTSTORE_PASSWORD = "password";
-
- @Override
- protected void setUp() throws Exception
- {
- setTestClientSystemProperty("profile.use_ssl", "true");
- setConfigurationProperty("connector.ssl.enabled", "true");
- setConfigurationProperty("connector.ssl.sslOnly", "true");
- super.setUp();
- }
-
- protected void createConnection() throws Exception
- {
-
- final String sslPrototypeUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
- "?ssl='true'&ssl_verify_hostname='false'" +
- "&key_store='%s'&key_store_password='%s'" +
- "&trust_store='%s'&trust_store_password='%s'" +
- "'";
-
- final String url = String.format(sslPrototypeUrl,System.getProperty("test.port.ssl"),
- KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD);
-
- _connection = (AMQConnection) getConnection(new AMQConnectionURL(url));
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
index 93cceb1048..c33dde53b7 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -101,10 +101,4 @@ public class AMQSessionTest extends QpidBrokerTestCase
assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName());
}
- public static void stopVmBrokers()
- {
- _queue = null;
- _topic = null;
- _session = null;
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java
new file mode 100644
index 0000000000..ef90ab8ffe
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Ensures that queue specific session factory method {@link QueueConnection#createQueueSession()} create sessions
+ * of type {@link QueueSession} and that those sessions correctly restrict the available JMS operations
+ * operations to exclude those applicable to only topics.
+ *
+ * @see TopicSessionFactoryTest
+ */
+public class QueueSessionFactoryTest extends QpidBrokerTestCase
+{
+ public void testQueueSessionIsNotATopicSession() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ assertFalse(queueSession instanceof TopicSession);
+ }
+
+ public void testQueueSessionCannotCreateTemporaryTopics() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ try
+ {
+ queueSession.createTemporaryTopic();
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createTemporaryTopic from QueueSession", s.getMessage());
+ }
+ }
+
+ public void testQueueSessionCannotCreateTopics() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ try
+ {
+ queueSession.createTopic("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createTopic from QueueSession", s.getMessage());
+ }
+ }
+
+ public void testQueueSessionCannotCreateDurableSubscriber() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ Topic topic = getTestTopic();
+
+ try
+ {
+ queueSession.createDurableSubscriber(topic, "abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createDurableSubscriber from QueueSession", s.getMessage());
+ }
+ }
+
+ public void testQueueSessionCannoutUnsubscribe() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ try
+ {
+ queueSession.unsubscribe("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call unsubscribe from QueueSession", s.getMessage());
+ }
+ }
+
+ private QueueSession getQueueSession() throws Exception
+ {
+ QueueConnection queueConnection = (QueueConnection)getConnection();
+ return queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java
new file mode 100644
index 0000000000..6397f15e0a
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Ensures that topic specific session factory method {@link TopicConnection#createTopicSession()} create sessions
+ * of type {@link TopicSession} and that those sessions correctly restrict the available JMS operations
+ * operations to exclude those applicable to only queues.
+ *
+ * @see QueueSessionFactoryTest
+ */
+public class TopicSessionFactoryTest extends QpidBrokerTestCase
+{
+ public void testTopicSessionIsNotAQueueSession() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ assertFalse(topicSession instanceof QueueSession);
+ }
+
+ public void testTopicSessionCannotCreateCreateBrowser() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ Queue queue = getTestQueue();
+ try
+ {
+ topicSession.createBrowser(queue);
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createBrowser from TopicSession", s.getMessage());
+ }
+ }
+
+ public void testTopicSessionCannotCreateQueues() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ try
+ {
+ topicSession.createQueue("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createQueue from TopicSession", s.getMessage());
+ }
+ }
+
+ public void testTopicSessionCannotCreateTemporaryQueues() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ try
+ {
+ topicSession.createTemporaryQueue();
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createTemporaryQueue from TopicSession", s.getMessage());
+ }
+ }
+
+ private TopicSession getTopicSession() throws Exception
+ {
+ TopicConnection topicConnection = (TopicConnection)getConnection();
+ return topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index d3d9cf2984..e948aaffb3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -721,7 +721,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
-
+ ((AMQSession)session).sync();
// should be 1 or 2 messages on queue now
// (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index f680a20288..9a8da14f83 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -46,6 +46,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -56,6 +57,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -1101,6 +1103,15 @@ public class QpidBrokerTestCase extends QpidTestCase
return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
}
+ /**
+ * Return a Topic specific for this test.
+ * Uses getTestQueueName() as the name of the topic
+ * @return
+ */
+ public Topic getTestTopic()
+ {
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName());
+ }
protected void tearDown() throws java.lang.Exception
{
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index d8c463b810..015aa19a8e 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -23,10 +23,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateEx
// QPID-3576: Java client issue. MessageConsumer#close() time-out.
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testDeleteOptions
-//This test requires SSL, but SSL is only enabled for the C++ broker in the cpp.ssl test profile
-//which runs *all* the tests with SSL, so this one can be excluded safely enough
-org.apache.qpid.test.unit.client.AMQSSLConnectionTest#*
-
org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#*
org.apache.qpid.client.ResetMessageListenerTest#*
@@ -178,3 +174,5 @@ org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseGroupAssignm
org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseWithRelease
org.apache.qpid.server.queue.MessageGroupQueueTest#testGroupAssignmentSurvivesEmpty
+// CPP Broker does not implement message conversion from 0-9-1
+org.apache.qpid.server.message.MessageProtocolConversionTest#*
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index 057d7c2c44..ba96232fe3 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -21,8 +21,10 @@
//Exclude the following from brokers using the 0-8/0-9/0-9-1 protocols
//======================================================================
-// This test requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently
+// These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently
org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
+org.apache.qpid.server.message.MessageProtocolConversionTest#*
+
// QPID-2478 test fails when run against broker using 0-8/9
org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange