diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-01 15:50:27 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-01 15:50:27 +0000 |
commit | 92be7e8f3163c048a8642d2deeaa921bbb65dc9c (patch) | |
tree | 5c0fa12fa0787d8870eab113469f74ccfb07a5be | |
parent | e78f6a9e59098ac104892a79b74c9895272b292e (diff) | |
download | qpid-python-92be7e8f3163c048a8642d2deeaa921bbb65dc9c.tar.gz |
NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295635 13f79535-47bb-0310-9956-ffa450edef68
71 files changed, 1754 insertions, 750 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 2e2d2f0b11..8884a99923 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -30,12 +30,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.bind.tuple.StringBinding; import com.sleepycat.je.*; @@ -94,6 +96,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _log = Logger.getLogger(BDBMessageStore.class); + private static final int LOCK_RETRY_ATTEMPTS = 5; + static final int DATABASE_FORMAT_VERSION = 5; private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version"; public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; @@ -893,103 +897,133 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); - + boolean complete = false; com.sleepycat.je.Transaction tx = null; - Cursor cursor = null; + Random rand = null; + int attempts = 0; try { - tx = _environment.beginTransaction(null, null); - - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); - - if (_log.isDebugEnabled()) + do { - _log.debug("Removing message id " + messageId); - } + tx = null; + try + { + tx = _environment.beginTransaction(null, null); + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); - OperationStatus status = _messageMetaDataDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } + if (_log.isDebugEnabled()) + { + _log.debug("Removing message id " + messageId); + } - if (_log.isDebugEnabled()) - { - _log.debug("Deleted metadata for message " + messageId); - } - //now remove the content data from the store if there is any. + OperationStatus status = _messageMetaDataDb.delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + + messageId); + } - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); + if (_log.isDebugEnabled()) + { + _log.debug("Deleted metadata for message " + messageId); + } - TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + //now remove the content data from the store if there is any. - //Use a partial record for the value to prevent retrieving the - //data itself as we only need the key to identify what to remove. - DatabaseEntry value = new DatabaseEntry(); - value.setPartial(0, 0, true); - cursor = _messageContentDb.openCursor(tx, null); - status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); - while (status == OperationStatus.SUCCESS) - { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); + int offset = 0; + do + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset); + TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + //Use a partial record for the value to prevent retrieving the + //data itself as we only need the key to identify what to remove. + DatabaseEntry value = new DatabaseEntry(); + value.setPartial(0, 4, true); + + status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED); + + if(status == OperationStatus.SUCCESS) + { - if(mck.getMessageId() != messageId) - { - //we have exhausted all chunks for this message id, break - break; + offset += IntegerBinding.entryToInt(value); + _messageContentDb.delete(tx, contentKeyEntry); + if (_log.isDebugEnabled()) + { + _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + } + } + } + while (status == OperationStatus.SUCCESS); + + commit(tx, sync); + complete = true; + tx = null; } - else + catch (LockConflictException e) { - status = cursor.delete(); + try + { + if(tx != null) + { + tx.abort(); + } + } + catch(DatabaseException e2) + { + _log.warn("Unable to abort transaction after LockConflictExcption", e2); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw e; + } + - if(status == OperationStatus.NOTFOUND) + _log.warn("Lock timeout exception. Retrying (attempt " + + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); + + if(++attempts < LOCK_RETRY_ATTEMPTS) { - cursor.close(); - cursor = null; + if(rand == null) + { + rand = new Random(); + } - tx.abort(); - throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); - } + try + { + Thread.sleep(500l + (long)(500l * rand.nextDouble())); + } + catch (InterruptedException e1) + { - if (_log.isDebugEnabled()) + } + } + else { - _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + // rethrow the lock conflict exception since we could not solve by retrying + throw e; } } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } - - cursor.close(); - cursor = null; - - commit(tx, sync); + while(!complete); } catch (DatabaseException e) { - e.printStackTrace(); + _log.error("Unexpected BDB exception", e); if (tx != null) { try { - if(cursor != null) - { - cursor.close(); - cursor = null; - } - tx.abort(); + tx = null; } catch (DatabaseException e1) { @@ -1001,15 +1035,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } finally { - if(cursor != null) + if (tx != null) { try { - cursor.close(); + tx.abort(); + tx = null; } - catch (DatabaseException e) + catch (DatabaseException e1) { - throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e); + throw new AMQStoreException("Error aborting transaction " + e1, e1); } } } @@ -2073,7 +2108,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { // RHM-7 Periodically wake up and check, just in case we // missed a notification. Don't want to lock the broker hard. - _lock.wait(250); + _lock.wait(1000); } catch (InterruptedException e) { diff --git a/qpid/java/broker/src/main/java/log4j.properties b/qpid/java/broker/src/main/java/log4j.properties deleted file mode 100644 index 6788c65463..0000000000 --- a/qpid/java/broker/src/main/java/log4j.properties +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -log4j.rootCategory=${amqj.logging.level}, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=all -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 873c846258..c0ecbb3630 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -74,27 +74,20 @@ import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.SubscriptionImpl; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class AMQChannel implements SessionConfig, AMQSessionModel +public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -133,6 +126,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final MessageStore _messageStore; + private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); + + private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); // Set of messages being acknoweledged in the current transaction @@ -185,7 +182,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _messageStore = messageStore; // by default the session is non-transactional - _transaction = new AutoCommitTransaction(_messageStore); + _transaction = new AsyncAutoCommitTransaction(_messageStore, this); _clientDeliveryMethod = session.createDeliveryMethod(_channelId); } @@ -204,14 +201,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public boolean isTransactional() { - // this does not look great but there should only be one "non-transactional" - // transactional context, while there could be several transactional ones in - // theory - return !(_transaction instanceof AutoCommitTransaction); + return _transaction.isTransactional(); } public void receivedComplete() { + sync(); } @@ -267,7 +262,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException { - if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), info.getRoutingKey().asString(), e.getName())) + String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); + if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) { throw new AMQSecurityException("Permission denied: " + e.getName()); } @@ -1562,4 +1558,69 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } } + + public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); + } + + public void completeAsyncCommands() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) + { + cmd.complete(); + _unfinishedCommandsQueue.poll(); + } + while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) + { + cmd = _unfinishedCommandsQueue.poll(); + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public void sync() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.poll()) != null) + { + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + private static class AsyncCommand + { + private final MessageStore.StoreFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + void awaitReadyForCompletion() + { + _future.waitForCompletion(); + } + + void complete() + { + if(!_future.isComplete()) + { + _future.waitForCompletion(); + } + _action.postCommit(); + _action = null; + } + + boolean isReadyForCompletion() + { + return _future.isComplete(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index 5c1814590c..e3d8747d72 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -417,12 +417,12 @@ public class Broker else { System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); - System.err.println("Using the fallback internal log4j.properties configuration"); + System.err.println("Using the fallback internal fallback-log4j.properties configuration"); - InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); + InputStream propsFile = this.getClass().getResourceAsStream("/fallback-log4j.properties"); if(propsFile == null) { - throw new IOException("Unable to load the fallback internal log4j.properties configuration file"); + throw new IOException("Unable to load the fallback internal fallback-log4j.properties configuration file"); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java index d587ef0c16..1b0168df56 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,6 +53,11 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -71,7 +77,7 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
}
-
+ channel.sync();
session.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 29054f55c1..bc2a2dca04 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -68,6 +68,7 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC { MethodRegistry methodRegistry = session.getMethodRegistry(); BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); + channel.sync(); session.writeFrame(cancelOkBody.generateFrame(channelId)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 8875f21d0b..a1cfb14753 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -60,6 +60,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + channel.sync(); if (_logger.isDebugEnabled()) { _logger.debug("BasicConsume: from '" + body.getQueue() + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index bbb009003c..2073299467 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -75,6 +75,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { + channel.sync(); AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); if (queue == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index dd3281c65f..2cf043dd26 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -46,7 +46,7 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> { throw body.getChannelNotFoundException(channelId); } - + channel.sync(); channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index c7842cd643..429217321c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic { MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry(); AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody(); + channel.sync(); session.writeFrame(recoverOk.generateFrame(channelId)); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java index f9feada6fe..1e2a83b922 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java @@ -59,7 +59,7 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B {
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.resend(body.getRequeue());
// Qpid 0-8 hacks a synchronous -ok onto recover.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 32aa99534b..ecffd1b9cb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -65,6 +65,7 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); } + channel.sync(); session.closeChannel(channelId); // Client requested closure so we don't wait for ok we send it stateManager.getProtocolSession().closeChannelOk(channelId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 5ccaa49de8..365c8bd9c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -55,6 +55,7 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB { throw body.getChannelNotFoundException(channelId); } + channel.sync(); channel.setSuspended(!body.getActive()); _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 21aea1510b..53835f381f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -69,7 +70,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MethodRegistry methodRegistry = session.getMethodRegistry(); - + final AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + channel.sync(); AMQShortString exchangeName = body.getExchange(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 98a0d33487..69cf0c9e20 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -55,6 +56,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } if (_logger.isDebugEnabled()) { @@ -102,6 +108,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { MethodRegistry methodRegistry = session.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + channel.sync(); session.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 586aaf9336..339085691f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -49,7 +50,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - + final AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + channel.sync(); try { if(exchangeRegistry.getExchange(body.getExchange()) == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 0eb69e4b16..bb979d5441 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -64,18 +64,18 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + AMQChannel channel = protocolConnection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } final AMQQueue queue; final AMQShortString routingKey; if (body.getQueue() == null) { - AMQChannel channel = protocolConnection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } queue = channel.getDefaultQueue(); @@ -150,6 +150,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } if (!body.getNowait()) { + channel.sync(); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); protocolConnection.writeFrame(responseBody.generateFrame(channelId)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 693b316607..32cd1c2d08 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -197,6 +197,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (!body.getNowait()) { + channel.sync(); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 902e3ade85..107e485275 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -71,7 +71,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { throw body.getChannelNotFoundException(channelId); } - + channel.sync(); AMQQueue queue; if (body.getQueue() == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 6c3e11be5b..7d609f9064 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -109,7 +109,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod if(!body.getNowait()) { - + channel.sync(); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); protocolConnection.writeFrame(responseBody.generateFrame(channelId)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java index 3849c5af19..9915627a94 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -132,6 +132,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB // 0-8 does not support QueueUnbind throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null); } + channel.sync(); session.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java index 483bca894e..fa06a99204 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.output; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.*; import org.apache.qpid.AMQPInvalidClassException; +import java.util.HashMap; import java.util.Map; public class HeaderPropertiesConverter { - public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage) + public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost) { BasicContentHeaderProperties props = new BasicContentHeaderProperties(); @@ -82,11 +83,23 @@ public class HeaderPropertiesConverter } if(messageProps.hasMessageId()) { - props.setMessageId(messageProps.getMessageId().toString()); + props.setMessageId("ID:" + messageProps.getMessageId().toString()); } + if(messageProps.hasReplyTo()) + { + ReplyTo replyTo = messageProps.getReplyTo(); + String exchangeName = replyTo.getExchange(); + String routingKey = replyTo.getRoutingKey(); + if(exchangeName == null) + { + exchangeName = ""; + } - // TODO Reply-to + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString(); + props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'")); + } if(messageProps.hasUserId()) { props.setUserId(new AMQShortString(messageProps.getUserId())); @@ -94,7 +107,12 @@ public class HeaderPropertiesConverter if(messageProps.hasApplicationHeaders()) { - Map<String, Object> appHeaders = messageProps.getApplicationHeaders(); + Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); + if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) + { + props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); + } + FieldTable ft = new FieldTable(); for(Map.Entry<String, Object> entry : appHeaders.entrySet()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index efd904f6aa..1e62e5e9ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -91,7 +91,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 010afcb1a9..78507b0cf2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -86,7 +86,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index 5e2b3e4556..9102b6c651 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -85,7 +85,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 241ab3fcb9..547f2440db 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1069,7 +1069,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { try { - closeSession(); + try + { + closeSession(); + } + finally + { + closeProtocolSession(); + } } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4faca15eb0..dfad9157c5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -275,10 +275,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1")) { - String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP)); + Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP); _messageGroupManager = new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), - defaultGroup == null ? QPID_NO_GROUP : defaultGroup, + defaultGroup == null ? QPID_NO_GROUP : defaultGroup.toString(), this); } else @@ -2255,9 +2255,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public boolean equals(Object o) { - assert o != null; - assert o instanceof QueueEntryListener; - return _sub == ((QueueEntryListener) o)._sub; + return o != null + && o instanceof SimpleAMQQueue.QueueEntryListener + && _sub == ((QueueEntryListener) o)._sub; } public int hashCode() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 3f02442704..865b3d1f48 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; import java.util.Map; @@ -7,7 +26,11 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class SortedQueue extends OutOfOrderQueue { - private String _sortedPropertyName; + //Lock object to synchronize enqueue. Used instead of the object + //monitor to prevent lock order issues with subscription sendLocks + //and consumer updates in the super classes + private final Object _sortedQueueLock = new Object(); + private final String _sortedPropertyName; protected SortedQueue(final String name, final boolean durable, final String owner, final boolean autoDelete, final boolean exclusive, @@ -23,8 +46,11 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public synchronized void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { - super.enqueue(message, action); + synchronized (_sortedQueueLock) + { + super.enqueue(message, action); + } } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java index 7a70795e77..87c79178f0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; public class SortedQueueEntryListFactory implements QueueEntryListFactory diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java index 42818db214..689e48b4cf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java @@ -172,7 +172,6 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { EntryFinder visitor = new EntryFinder(sub); sub.getQueue().visit(visitor); - _logger.debug("Earliest available entry for " + sub + " is " + visitor.getEntry() + (visitor.getEntry() == null ? "" : " : " + getKey(visitor.getEntry()))); return visitor.getEntry(); } @@ -250,12 +249,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { if(newState == QueueEntry.State.ACQUIRED) { - _logger.debug("Adding to " + _group); _group.add(); } else if(oldState == QueueEntry.State.ACQUIRED) { - _logger.debug("Subtracting from " + _group); _group.subtract(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 1e36119ce8..bde756dd03 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.url.AMQBindingURL; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -85,7 +77,6 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { - private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -450,7 +441,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); } - // TODO - ReplyTo + if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) + { + String origReplyToString = properties.getReplyTo().asString(); + ReplyTo replyTo = new ReplyTo(); + // if the string looks like a binding URL, then attempt to parse it... + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + } + catch (URISyntaxException e) + { + replyTo.setRoutingKey(origReplyToString); + } + messageProps.setReplyTo(replyTo); + + } + + if(properties.getMessageId() != null) + { + try + { + String messageIdAsString = properties.getMessageIdAsString(); + if(messageIdAsString.startsWith("ID:")) + { + messageIdAsString = messageIdAsString.substring(3); + } + UUID uuid = UUID.fromString(messageIdAsString); + messageProps.setMessageId(uuid); + } + catch(IllegalArgumentException e) + { + // ignore - can't parse + } + } + + if(properties.getUserId() != null) { @@ -459,7 +496,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr FieldTable fieldTable = properties.getHeaders(); - final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + + if(properties.getType() != null) + { + appHeaders.put("x-jms-type", properties.getTypeAsString()); + } messageProps.setApplicationHeaders(appHeaders); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index ab07ed20f6..04cdbf2b25 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -62,6 +62,8 @@ import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ServerConnection extends Connection implements Managable, AMQConnectionModel, LogSubject, AuthorizationHolder { @@ -123,6 +125,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec { _virtualHost.getConnectionRegistry().deregisterConnection(this); } + unregisterConnectionMbean(); } if (state == State.CLOSED) @@ -161,15 +164,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec initialiseStatistics(); - try - { - _mBean = new ServerConnectionMBean(this); - _mBean.register(); - } - catch (JMException jme) - { - log.error("Unable to create mBean for ServerConnection",jme); - } + registerConnectionMbean(); } public void setConnectionConfig(final ConnectionConfig config) @@ -285,11 +280,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec public void close(AMQConstant cause, String message) throws AMQException { closeSubscriptions(); - if (_mBean != null) - { - _mBean.unregister(); - _mBean = null; - } + unregisterConnectionMbean(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { @@ -433,11 +424,6 @@ public class ServerConnection extends Connection implements Managable, AMQConnec public void closed() { closeSubscriptions(); - if (_mBean != null) - { - _mBean.unregister(); - _mBean = null; - } super.closed(); } @@ -483,4 +469,30 @@ public class ServerConnection extends Connection implements Managable, AMQConnec _mBean.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); } } + + private void registerConnectionMbean() + { + try + { + _mBean = new ServerConnectionMBean(this); + _mBean.register(); + } + catch (JMException jme) + { + log.error("Unable to register mBean for ServerConnection", jme); + } + } + + private void unregisterConnectionMbean() + { + if (_mBean != null) + { + if (log.isDebugEnabled()) + { + log.debug("Unregistering mBean for ServerConnection" + _mBean); + } + _mBean.unregister(); + _mBean = null; + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 2142b2f7c3..62a1e2b0f5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -27,6 +27,7 @@ import java.security.Principal; import java.text.MessageFormat; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; -import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -84,18 +85,20 @@ import org.apache.qpid.transport.SessionDelegate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject +public class ServerSession extends Session + implements AuthorizationHolder, SessionConfig, + AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; + private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; private final UUID _id; private ConnectionConfig _connectionConfig; private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction(); private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); @@ -147,7 +150,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { super(connection, delegate, name, expiry); _connectionConfig = connConfig; - _transaction = new AutoCommitTransaction(this.getMessageStore()); + _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); @@ -184,16 +187,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - PostEnqueueAction postTransactionAction; - if(isTransactional()) - { - postTransactionAction = new PostEnqueueAction(queues, message) ; - } - else - { - postTransactionAction = _postEnqueueAction; - postTransactionAction.setState(queues, message); - } + PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; _transaction.enqueue(queues,message, postTransactionAction, 0L); incrementOutstandingTxnsIfNecessary(); updateTransactionalActivity(); @@ -221,12 +215,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void accept(RangeSet ranges) { dispositionChange(ranges, new MessageDispositionAction() - { - public void performAction(MessageDispositionChangeListener listener) - { - listener.onAccept(); - } - }); + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onAccept(); + } + }); } @@ -444,10 +438,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public boolean isTransactional() { - // this does not look great but there should only be one "non-transactional" - // transactional context, while there could be several transactional ones in - // theory - return !(_transaction instanceof AutoCommitTransaction); + return _transaction.isTransactional(); } public boolean inTransaction() @@ -765,6 +756,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { subscription_0_10.flushCreditState(false); } + awaitCommandCompletion(); } private class PostEnqueueAction implements ServerTransaction.Action @@ -774,17 +766,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private ServerMessage _message; private final boolean _transactional; - public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message) + public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional) { - _transactional = true; + _transactional = transactional; setState(queues, message); } - public PostEnqueueAction() - { - _transactional = false; - } - public void setState(List<? extends BaseQueue> queues, ServerMessage message) { _message = message; @@ -830,4 +817,76 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { return _blocking.get(); } + + private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); + + public void completeAsyncCommands() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) + { + cmd.complete(); + _unfinishedCommandsQueue.poll(); + } + while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) + { + cmd = _unfinishedCommandsQueue.poll(); + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public void awaitCommandCompletion() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.poll()) != null) + { + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public Object getAsyncCommandMark() + { + return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); + } + + public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); + } + + private static class AsyncCommand + { + private final MessageStore.StoreFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + void awaitReadyForCompletion() + { + _future.waitForCompletion(); + } + + void complete() + { + if(!_future.isComplete()) + { + _future.waitForCompletion(); + } + _action.postCommit(); + _action = null; + } + + boolean isReadyForCompletion() + { + return _future.isComplete(); + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index b6e142a5fd..2eab65cf8a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.transport; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -55,6 +54,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -81,9 +81,22 @@ public class ServerSessionDelegate extends SessionDelegate if(!session.isClosing()) { - super.command(session, method); + Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark(); + super.command(session, method, false); + Object newOutstanding = ((ServerSession)session).getAsyncCommandMark(); + if(newOutstanding == null || newOutstanding == asyncCommandMark) + { + session.processed(method); + } + + if(newOutstanding != null) + { + ((ServerSession)session).completeAsyncCommands(); + } + if (method.isSync()) { + ((ServerSession)session).awaitCommandCompletion(); session.flushProcessed(); } } @@ -98,7 +111,13 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageAccept(Session session, MessageAccept method) { - ((ServerSession)session).accept(method.getTransfers()); + final ServerSession serverSession = (ServerSession) session; + serverSession.accept(method.getTransfers()); + if(!serverSession.isTransactional()) + { + serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, + new CommandProcessedAction(serverSession, method)); + } } @Override @@ -252,7 +271,7 @@ public class ServerSessionDelegate extends SessionDelegate } @Override - public void messageTransfer(Session ssn, MessageTransfer xfr) + public void messageTransfer(Session ssn, final MessageTransfer xfr) { final Exchange exchange = getExchangeForMessage(ssn, xfr); @@ -294,12 +313,13 @@ public class ServerSessionDelegate extends SessionDelegate exchangeInUse = exchange; } + final ServerSession serverSession = (ServerSession) ssn; if(!queues.isEmpty()) { final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); - ((ServerSession) ssn).enqueue(message, queues); + MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); + serverSession.enqueue(message, queues); storeMessage.flushToStore(); } else @@ -313,13 +333,19 @@ public class ServerSessionDelegate extends SessionDelegate } else { - ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); + serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); } } - - ssn.processed(xfr); + if(serverSession.isTransactional()) + { + serverSession.processed(xfr); + } + else + { + serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); + } } private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, @@ -404,6 +430,13 @@ public class ServerSessionDelegate extends SessionDelegate @Override + public void executionSync(final Session ssn, final ExecutionSync sync) + { + ((ServerSession)ssn).awaitCommandCompletion(); + super.executionSync(ssn, sync); + } + + @Override public void exchangeDeclare(Session session, ExchangeDeclare method) { String exchangeName = method.getExchange(); @@ -1269,4 +1302,25 @@ public class ServerSessionDelegate extends SessionDelegate final ServerConnection scon = (ServerConnection) session.getConnection(); SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); } + + private static class CommandProcessedAction implements ServerTransaction.Action + { + private final ServerSession _serverSession; + private final Method _method; + + public CommandProcessedAction(final ServerSession serverSession, final Method xfr) + { + _serverSession = serverSession; + _method = xfr; + } + + public void postCommit() + { + _serverSession.processed(_method); + } + + public void onRollback() + { + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java new file mode 100755 index 0000000000..7e238aeadc --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -0,0 +1,329 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStore.StoreFuture; + +import java.util.Collection; +import java.util.List; + +/** + * An implementation of ServerTransaction where each enqueue/dequeue + * operation takes place within it own transaction. + * + * Since there is no long-lived transaction, the commit and rollback methods of + * this implementation are empty. + */ +public class AsyncAutoCommitTransaction implements ServerTransaction +{ + protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class); + + private final MessageStore _messageStore; + private final FutureRecorder _futureRecorder; + + public static interface FutureRecorder + { + public void recordFuture(StoreFuture future, Action action); + + } + + public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder) + { + _messageStore = transactionLog; + _futureRecorder = recorder; + } + + public long getTransactionStartTime() + { + return 0L; + } + + /** + * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered + * by the caller are executed immediately. + */ + public void addPostTransactionAction(final Action immediateAction) + { + addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction); + + } + + public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) + { + MessageStore.Transaction txn = null; + try + { + MessageStore.StoreFuture future; + if(message.isPersistent() && queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + + txn = _messageStore.newTransaction(); + txn.dequeueMessage(queue, message); + future = txn.commitTranAsync(); + + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + } + catch (AMQException e) + { + _logger.error("Error during message dequeue", e); + throw new RuntimeException("Error during message dequeue", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + } + + private void addFuture(final MessageStore.StoreFuture future, final Action action) + { + if(action != null) + { + if(future.isComplete()) + { + action.postCommit(); + } + else + { + _futureRecorder.recordFuture(future, action); + } + } + } + + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) + { + MessageStore.Transaction txn = null; + try + { + for(QueueEntry entry : queueEntries) + { + ServerMessage message = entry.getMessage(); + BaseQueue queue = entry.getQueue(); + + if(message.isPersistent() && queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + + if(txn == null) + { + txn = _messageStore.newTransaction(); + } + + txn.dequeueMessage(queue, message); + } + + } + MessageStore.StoreFuture future; + if(txn != null) + { + future = txn.commitTranAsync(); + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + } + catch (AMQException e) + { + _logger.error("Error during message dequeues", e); + throw new RuntimeException("Error during message dequeues", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + } + + + public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) + { + MessageStore.Transaction txn = null; + try + { + MessageStore.StoreFuture future; + if(message.isPersistent() && queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + + txn = _messageStore.newTransaction(); + txn.enqueueMessage(queue, message); + future = txn.commitTranAsync(); + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + } + catch (AMQException e) + { + _logger.error("Error during message enqueue", e); + throw new RuntimeException("Error during message enqueue", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + + } + + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + { + MessageStore.Transaction txn = null; + try + { + + if(message.isPersistent()) + { + for(BaseQueue queue : queues) + { + if (queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + + txn.enqueueMessage(queue, message); + + + } + } + + } + MessageStore.StoreFuture future; + if (txn != null) + { + future = txn.commitTranAsync(); + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + + + } + catch (AMQException e) + { + _logger.error("Error during message enqueues", e); + throw new RuntimeException("Error during message enqueues", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + } + + + public void commit(final Runnable immediatePostTransactionAction) + { + if(immediatePostTransactionAction != null) + { + addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() + { + public void postCommit() + { + immediatePostTransactionAction.run(); + } + + public void onRollback() + { + } + }); + } + } + + public void commit() + { + } + + public void rollback() + { + } + + public boolean isTransactional() + { + return false; + } + + private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) + { + if (txn != null) + { + try + { + txn.abortTran(); + } + catch (AMQStoreException e) + { + _logger.error("Abort transaction failed", e); + // Deliberate decision not to re-throw this exception. Rationale: we can only reach here if a previous + // TransactionLog method has ended in Exception. If we were to re-throw here, we would prevent + // our caller from receiving the original exception (which is likely to be more revealing of the underlying error). + } + } + if (postTransactionAction != null) + { + postTransactionAction.onRollback(); + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index a67d4badd1..ad2a299108 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -242,6 +242,11 @@ public class AutoCommitTransaction implements ServerTransaction { } + public boolean isTransactional() + { + return false; + } + private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) { if (txn != null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 7f5b5fb8b2..34bac0411e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -309,7 +309,12 @@ public class LocalTransaction implements ServerTransaction private void resetDetails() { _transaction = null; - _postTransactionActions.clear(); + _postTransactionActions.clear(); _txnStartTime = 0L; } + + public boolean isTransactional() + { + return true; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index acdf712de9..c568ae67aa 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -41,6 +41,8 @@ import org.apache.qpid.server.queue.QueueEntry; */ public interface ServerTransaction { + + /** * Represents an action to be performed on transaction commit or rollback */ @@ -108,4 +110,6 @@ public interface ServerTransaction * be executed immediately after the underlying transaction has rolled-back. */ void rollback(); + + boolean isTransactional(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java index 43fb5b4cb3..99e05851ca 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index 34ad0e5668..d177993886 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.AMQMessage; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 999b22299c..8311aa80ce 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -40,7 +40,12 @@ import org.apache.qpid.url.BindingURL; * to support any destination defined in AMQP 0-10 spec. */ public class AMQAnyDestination extends AMQDestination implements Queue, Topic -{ +{ + protected AMQAnyDestination() + { + super(); + } + public AMQAnyDestination(BindingURL binding) { super(binding); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 802cc55b0e..0ded689ea6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -211,7 +211,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec + " port: " + brokerDetail.getPort() + " vhost: " + _conn.getVirtualHost() + " username: " + _conn.getUsername() + " password: " - + _conn.getPassword()); + + "********"); } ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 92602ac3a2..61fe722423 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -151,6 +151,10 @@ public abstract class AMQDestination implements Destination, Referenceable return defaultDestSyntax; } + protected AMQDestination() + { + } + protected AMQDestination(Address address) throws Exception { this._address = address; @@ -186,6 +190,11 @@ public abstract class AMQDestination implements Destination, Referenceable protected AMQDestination(String str) throws URISyntaxException { + parseDestinationString(str); + } + + protected void parseDestinationString(String str) throws URISyntaxException + { _destSyntax = getDestType(str); str = stripSyntaxPrefix(str); if (_destSyntax == DestSyntax.BURL) @@ -305,6 +314,16 @@ public abstract class AMQDestination implements Destination, Referenceable } } + public void setDestinationString(String str) throws Exception + { + parseDestinationString(str); + } + + public String getDestinationString() + { + return toString(); + } + public DestSyntax getDestSyntax() { return _destSyntax; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 5bd1bd629a..5ecb5d5913 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -30,6 +30,10 @@ import org.apache.qpid.url.BindingURL; public class AMQQueue extends AMQDestination implements Queue { + protected AMQQueue() + { + super(); + } public AMQQueue(String address) throws URISyntaxException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 784b75af10..48c4e3e3e6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - protected volatile boolean _usingDispatcherForCleanup; + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -3570,3 +3570,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 49b77dcc7b..8395c8f4b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - synchronized (getMessageDeliveryLock()) - { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - sync(); - List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _prefetchedMessageTags.addAll(tags); - } - } - - _usingDispatcherForCleanup = true; - syncDispatchQueue(); - _usingDispatcherForCleanup = false; - - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); - RangeSet all = RangeSetFactory.createRangeSet(delivered.size() - + prefetched.size()); - - for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) - { - Range range = deliveredIter.next(); - all.add(range); - } - - for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) - { - Range range = prefetchedIter.next(); - all.add(range); - } - - flushProcessed(all, false); - getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); - getQpidSession().messageRelease(prefetched); - sync(); + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + } } else { @@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().sync(); } } + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 780dbcafc2..5969d9a5a5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -43,6 +43,11 @@ public class AMQTopic extends AMQDestination implements Topic super(address); } + protected AMQTopic() + { + super(); + } + /** * Constructor for use in creating a topic using a BindingURL. * diff --git a/qpid/java/client/test/example_build.xml b/qpid/java/client/test/example_build.xml index 329c12982c..dda3cb4263 100644 --- a/qpid/java/client/test/example_build.xml +++ b/qpid/java/client/test/example_build.xml @@ -73,7 +73,7 @@ </javac> <copy todir="${example.classes}"> - <!-- copy any non java src files into the build tree, e.g. log4j.properties --> + <!-- copy any non java src files into the build tree, e.g. properties files --> <fileset dir="${example.src}"> <exclude name="**/*.java"/> <exclude name="**/package.html"/> diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java index 2c7fe7b8ed..d9c12148cb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java @@ -35,41 +35,42 @@ import static org.apache.qpid.messaging.util.PyPrint.pprint; public class Address { + private String _name; + private String _subject; + private Map _options; + private final String _myToString; + public static Address parse(String address) { return new AddressParser(address).parse(); } - private String name; - private String subject; - private Map options; - public Address(String name, String subject, Map options) { - this.name = name; - this.subject = subject; - this.options = options; + this._name = name; + this._subject = subject; + this._options = options; + this._myToString = String.format("%s/%s; %s", pprint(_name), pprint(_subject), pprint(_options)); } public String getName() { - return name; + return _name; } public String getSubject() { - return subject; + return _subject; } public Map getOptions() { - return options; + return _options; } public String toString() { - return String.format("%s/%s; %s", pprint(name), pprint(subject), - pprint(options)); + return _myToString; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 5a9ea73cae..3fc596d0eb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -33,7 +33,6 @@ import static org.apache.qpid.transport.Session.State.RESUMING; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.transport.network.Frame; -import static org.apache.qpid.transport.util.Functions.mod; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import static org.apache.qpid.util.Serial.ge; @@ -44,11 +43,9 @@ import static org.apache.qpid.util.Serial.max; import static org.apache.qpid.util.Strings.toUTF8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class Session extends SessionInvoker { private static final Logger log = Logger.get(Session.class); - + public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } static class DefaultSessionListener implements SessionListener @@ -113,7 +110,9 @@ public class Session extends SessionInvoker // outgoing command count private int commandsOut = 0; - private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024); + private Map<Integer,Method> commands = new HashMap<Integer, Method>(); + private final Object commandsLock = new Object(); private int commandBytes = 0; private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; @@ -196,7 +195,7 @@ public class Session extends SessionInvoker public void setAutoSync(boolean value) { - synchronized (commands) + synchronized (commandsLock) { this.autoSync = value; } @@ -204,10 +203,10 @@ public class Session extends SessionInvoker protected void setState(State state) { - synchronized (commands) + synchronized (commandsLock) { this.state = state; - commands.notifyAll(); + commandsLock.notifyAll(); } } @@ -276,13 +275,13 @@ public class Session extends SessionInvoker void resume() { _failoverRequired.set(false); - synchronized (commands) + synchronized (commandsLock) { attach(); for (int i = maxComplete + 1; lt(i, commandsOut); i++) { - Method m = commands[mod(i, commands.length)]; + Method m = getCommand(i); if (m == null) { m = new ExecutionSync(); @@ -337,11 +336,27 @@ public class Session extends SessionInvoker } } + private Method getCommand(int i) + { + return commands.get(i); + } + + private void setCommand(int commandId, Method command) + { + commands.put(commandId, command); + } + + private Method removeCommand(int id) + { + return commands.remove(id); + } + void dump() { - synchronized (commands) + synchronized (commandsLock) { - for (Method m : commands) + TreeMap<Integer, Method> ordered = new TreeMap<Integer, Method>(commands); + for (Method m : ordered.values()) { if (m != null) { @@ -484,7 +499,7 @@ public class Session extends SessionInvoker copy = processed.copy(); } - synchronized (commands) + synchronized (commandsLock) { if (state == DETACHED || state == CLOSING || state == CLOSED) { @@ -539,18 +554,16 @@ public class Session extends SessionInvoker { log.debug("%s complete(%d, %d)", this, lower, upper); } - synchronized (commands) + synchronized (commandsLock) { int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - int idx = mod(id, commands.length); - Method m = commands[idx]; + Method m = removeCommand(id); if (m != null) { commandBytes -= m.getBodySize(); m.complete(); - commands[idx] = null; } } if (le(lower, maxComplete + 1)) @@ -563,7 +576,7 @@ public class Session extends SessionInvoker log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); } - commands.notifyAll(); + commandsLock.notifyAll(); return gt(maxComplete, old); } } @@ -596,7 +609,7 @@ public class Session extends SessionInvoker protected boolean isCommandsFull(int id) { - return id - maxComplete >= commands.length; + return id - maxComplete >= commandLimit; } public void invoke(Method m) @@ -613,7 +626,7 @@ public class Session extends SessionInvoker acquireCredit(); } - synchronized (commands) + synchronized (commandsLock) { if (state == DETACHED && m.isUnreliable()) { @@ -629,7 +642,7 @@ public class Session extends SessionInvoker Thread current = Thread.currentThread(); if (!current.equals(resumer) ) { - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && (state != OPEN && state != CLOSED)) { checkFailoverRequired("Command was interrupted because of failover, before being sent"); @@ -678,7 +691,7 @@ public class Session extends SessionInvoker if (isFull(next)) { - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && isFull(next) && state != CLOSED) { if (state == OPEN || state == RESUMING) @@ -735,7 +748,7 @@ public class Session extends SessionInvoker if ((replayTransfer) || m.hasCompletionListener()) { - commands[mod(next, commands.length)] = m; + setCommand(next, m); commandBytes += m.getBodySize(); } if (autoSync) @@ -817,7 +830,7 @@ public class Session extends SessionInvoker public void sync(long timeout) { log.debug("%s sync()", this); - synchronized (commands) + synchronized (commandsLock) { int point = commandsOut - 1; @@ -826,19 +839,13 @@ public class Session extends SessionInvoker executionSync(SYNC); } - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { checkFailoverRequired("Session sync was interrupted by failover."); if(log.isDebugEnabled()) { - List<Method> waitingFor = - Arrays.asList(commands) - .subList(mod(maxComplete,commands.length), - mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length) - ? commands.length-1 - : mod(commandsOut-1, commands.length)); - log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor); + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); } w.await(); } @@ -909,7 +916,7 @@ public class Session extends SessionInvoker protected <T> Future<T> invoke(Method m, Class<T> klass) { - synchronized (commands) + synchronized (commandsLock) { int command = commandsOut; ResultFuture<T> future = new ResultFuture<T>(klass); @@ -1019,7 +1026,7 @@ public class Session extends SessionInvoker { log.debug("Closing [%s] in state [%s]", this, state); } - synchronized (commands) + synchronized (commandsLock) { switch(state) { @@ -1043,7 +1050,7 @@ public class Session extends SessionInvoker protected void awaitClose() { - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED) { checkFailoverRequired("close() was interrupted by failover."); @@ -1063,7 +1070,7 @@ public class Session extends SessionInvoker public void closed() { - synchronized (commands) + synchronized (commandsLock) { if (closing || getException() != null) { @@ -1074,7 +1081,7 @@ public class Session extends SessionInvoker state = DETACHED; } - commands.notifyAll(); + commandsLock.notifyAll(); synchronized (results) { @@ -1171,9 +1178,9 @@ public class Session extends SessionInvoker //prevent them waiting for timeout for 60 seconds //and possibly preventing failover proceeding _failoverRequired.set(true); - synchronized (commands) + synchronized (commandsLock) { - commands.notifyAll(); + commandsLock.notifyAll(); } synchronized (results) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 028b912ba1..cabff1cd13 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -45,10 +45,15 @@ public class SessionDelegate method.dispatch(ssn, this); } - public void command(Session ssn, Method method) { + public void command(Session ssn, Method method) + { + command(ssn, method, !method.hasPayload()); + } + public void command(Session ssn, Method method, boolean processed) + { ssn.identify(method); method.dispatch(ssn, this); - if (!method.hasPayload()) + if (processed) { ssn.processed(method); } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java index be129a67cc..a7b36bc98c 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java @@ -87,8 +87,9 @@ public class ConnectionFactoryProperties { if (_log.isTraceEnabled()) { - _log.trace("setConnectionURL(" + connectionURL + ")"); + _log.trace("setConnectionURL(" + Util.maskUrlForLog(connectionURL) + ")"); } + _hasBeenUpdated = true; this._connectionURL = connectionURL; } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java index d56f520db4..363af1bbcd 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java @@ -425,11 +425,6 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable */ public void setConnectionURL(final String connectionURL) { - if (_log.isTraceEnabled()) - { - _log.trace("setConnectionURL(" + connectionURL + ")"); - } - _raProperties.setConnectionURL(connectionURL); } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java index b927aaa0be..3957fa9660 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java @@ -34,8 +34,10 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.transaction.TransactionManager; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.ra.admin.QpidQueue; import org.apache.qpid.ra.admin.QpidTopic; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,4 +183,19 @@ public class Util { return (object == null ? "null" : object.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(object))) ; } + + + public static String maskUrlForLog(final String url) + { + String results = null; + + try + { + results = new AMQConnectionURL(url).toString(); + } + catch(Exception ignore){} + + return (results == null) ? url : results; + } + } diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 349fb91e4a..a3b8606525 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -264,7 +264,7 @@ <classpath refid="module.class.path"/> </javac> - <!-- copy any non java src files into the build tree, e.g. log4j.properties --> + <!-- copy any non java src files into the build tree, e.g. properties files --> <copy todir="${module.classes}" verbose="true"> <fileset dir="${module.src}"> <exclude name="**/*.java"/> @@ -285,7 +285,7 @@ <classpath refid="module.test.path"/> </javac> - <!-- copy any non java src files into the build tree, e.g. log4j.properties --> + <!-- copy any non java src files into the build tree, e.g. properties files --> <copy todir="${module.test.classes}" verbose="true"> <fileset dir="${module.test.src}"> <exclude name="**/*.java"/> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java index 5c5ad66777..d91b9b9263 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -1,24 +1,3 @@ -package org.apache.qpid.client.prefetch; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -39,6 +18,26 @@ import org.slf4j.LoggerFactory; * under the License. * */ +package org.apache.qpid.client.prefetch; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + public class PrefetchBehaviourTest extends QpidBrokerTestCase { protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class); @@ -132,44 +131,66 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase //wait for the other consumer to finish to ensure it completes ok _logger.debug("waiting for async consumer to complete"); assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS)); - assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); + assertFalse("Unexpected exception during async message processing",_exceptionCaught.get()); } /** - * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. - * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. - * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. - * Try to receive all 10 messages. + * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty. + * */ - public void testConnectionStop() throws Exception + public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception { - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); - Connection con = getConnection(); - con.start(); - Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); - - MessageProducer prod = ssn.createProducer(queue); - for (int i=0; i<10;i++) + Queue queue = getTestQueue(); + + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); + + Connection connection = getConnection(); + connection.start(); + // Create Consumer A + Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerA = consSessA.createConsumer(queue); + + // ensure message delivery to consumer A is started (required for 0-8..0-9-1) + final Message msg = consumerA.receiveNoWait(); + assertNull(msg); + + Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(producerSession, queue, 3); + + // Create Consumer B + MessageConsumer consumerB = null; + if (isBroker010()) + { + // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A + consumerB = consSessA.createConsumer(queue); + } + else { - prod.send(ssn.createTextMessage("Msg" + i)); + // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session + Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerB = consSessB.createConsumer(queue); } - MessageConsumer consumer = ssn.createConsumer(queue); - // This is to ensure we get the first client to prefetch. - Message msg = consumer.receive(1000); - assertNotNull("The first consumer should get one message",msg); - con.stop(); - - Connection con2 = getConnection(); - con2.start(); - Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = ssn2.createConsumer(queue); - for (int i=0; i<9;i++) + // As message delivery to consumer A is already started, the first two messages should + // now be with consumer A. The last message will still be on the Broker as consumer A's + // credit is exhausted and message delivery for consumer B is not yet running. + + // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A. + // If we were to reverse the order, the SessionComplete will restore Consumer A's credit, + // and the third message could be delivered to either Consumer A or Consumer B. + + // Check that consumer B gets the last (third) message. + final Message msgConsumerB = consumerB.receive(1500); + assertNotNull("Consumer B should have received a message", msgConsumerB); + assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX)); + + // Now check that consumer A has indeed got the first two messages. + for (int i = 0; i < 2; i++) { - TextMessage m = (TextMessage)consumer2.receive(1000); - assertNotNull("The second consumer should get 9 messages, but received only " + i,m); + final Message msgConsumerA = consumerA.receive(1500); + assertNotNull("Consumer A should have received a message " + i, msgConsumerA); + assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX)); } } - } + diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java index 5f758061d5..a258e4267d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Date; import java.util.Iterator; import java.util.List; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -37,9 +38,13 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.management.common.mbeans.ManagedConnection; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ManagedConnectionMBeanTest extends QpidBrokerTestCase { + private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnectionMBeanTest.class); + /** * JMX helper. */ @@ -120,9 +125,10 @@ public class ManagedConnectionMBeanTest extends QpidBrokerTestCase _connection.close(); + LOGGER.debug("Querying JMX for number of open connections"); connections = _jmxUtils.getManagedConnections("test"); assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 0, connections.size()); + assertEquals("Unexpected number of connection mbeans after connection closed", 0, connections.size()); } public void testCommit() throws Exception @@ -218,13 +224,13 @@ public class ManagedConnectionMBeanTest extends QpidBrokerTestCase mBean.rollbackTransactions(channelId.intValue()); Message m = consumer.receive(1000l); - assertNull("Unexpected message received", m); + assertNull("Unexpected message received: " + String.valueOf(m), m); producerSession.commit(); _connection.start(); m = consumer.receive(1000l); - assertNull("Unexpected message received", m); + assertNull("Unexpected message received after commit " + String.valueOf(m), m); } public void testAuthorisedId() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java index ec96f778f6..07faf1ef3e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java @@ -31,6 +31,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; @@ -195,7 +196,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true); - AMQFrame exchangeDeclare = body.generateFrame(0); + AMQFrame exchangeDeclare = body.generateFrame(((AMQSession)_session).getChannelId()); ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeleteOkBody.class); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java new file mode 100644 index 0000000000..a179b96768 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.AMQBindingURL; + +import javax.jms.*; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; + +public class MessageProtocolConversionTest extends QpidBrokerTestCase +{ + + private static final int TIMEOUT = 1500; + private Connection _connection_0_9_1; + private Connection _connection_0_10; + + private static final boolean BOOLEAN_TEST_VAL = true; + private static final byte BYTE_TEST_VAL = (byte) 4; + private static final byte[] BYTES_TEST_VAL = {5, 4, 3, 2, 1}; + private static final char CHAR_TEST_VAL = 'x'; + private static final double DOUBLE_TEST_VAL = Double.MAX_VALUE; + private static final float FLOAT_TEST_VAL = Float.MAX_VALUE; + private static final int INT_TEST_VAL = -73; + private static final long LONG_TEST_VAL = Long.MIN_VALUE / 2l; + private static final short SHORT_TEST_VAL = -586; + private static final String STRING_TEST_VAL = "This is a test text message"; + + @Override + public void setUp() throws Exception + { + super.setUp(); + setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + _connection_0_10 = getConnection(); + setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1"); + _connection_0_9_1 = getConnection(); + } + + public void test0_9_1_to_0_10_conversion() throws JMSException, AMQException + { + doConversionTests(_connection_0_9_1, _connection_0_10); + } + + public void test_0_10_to_0_9_1_conversion() throws JMSException, AMQException + { + + doConversionTests(_connection_0_10, _connection_0_9_1); + } + + private void doConversionTests(Connection producerConn, Connection consumerConn) throws JMSException, AMQException + { + Session producerSession = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = getTestQueue(); + + MessageProducer producer = producerSession.createProducer(queue); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + consumerConn.start(); + producerConn.start(); + + // Text Message + + Message m = producerSession.createTextMessage(STRING_TEST_VAL); + producer.send(m); + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected text message did not arrive", m); + assertTrue("Received message not an instance of TextMessage (" + m.getClass().getName() + " instead)", m instanceof TextMessage); + assertEquals("Message text not as expected", STRING_TEST_VAL, ((TextMessage) m).getText()); + + // Map Message + + MapMessage mapMessage = producerSession.createMapMessage(); + mapMessage.setBoolean("boolean", BOOLEAN_TEST_VAL); + mapMessage.setByte("byte", BYTE_TEST_VAL); + mapMessage.setBytes("bytes", BYTES_TEST_VAL); + mapMessage.setChar("char", CHAR_TEST_VAL); + mapMessage.setDouble("double", DOUBLE_TEST_VAL); + mapMessage.setFloat("float", FLOAT_TEST_VAL); + mapMessage.setInt("int", INT_TEST_VAL); + mapMessage.setLong("long", LONG_TEST_VAL); + mapMessage.setShort("short", SHORT_TEST_VAL); + mapMessage.setString("string", STRING_TEST_VAL); + + producer.send(mapMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected map message message did not arrive", m); + assertTrue("Received message not an instance of MapMessage (" + m.getClass().getName() + " instead)", m instanceof MapMessage); + MapMessage receivedMapMessage = (MapMessage) m; + assertEquals("Map message boolean value not as expected", BOOLEAN_TEST_VAL, receivedMapMessage.getBoolean("boolean")); + assertEquals("Map message byte value not as expected", BYTE_TEST_VAL, receivedMapMessage.getByte("byte")); + assertTrue("Map message bytes value not as expected", Arrays.equals(BYTES_TEST_VAL, receivedMapMessage.getBytes("bytes"))); + assertEquals("Map message char value not as expected", CHAR_TEST_VAL, receivedMapMessage.getChar("char")); + assertEquals("Map message double value not as expected", DOUBLE_TEST_VAL, receivedMapMessage.getDouble("double")); + assertEquals("Map message float value not as expected", FLOAT_TEST_VAL, receivedMapMessage.getFloat("float")); + assertEquals("Map message int value not as expected", INT_TEST_VAL, receivedMapMessage.getInt("int")); + assertEquals("Map message long value not as expected", LONG_TEST_VAL, receivedMapMessage.getLong("long")); + assertEquals("Map message short value not as expected", SHORT_TEST_VAL, receivedMapMessage.getShort("short")); + assertEquals("Map message string value not as expected", STRING_TEST_VAL, receivedMapMessage.getString("string")); + ArrayList expectedNames = Collections.list(mapMessage.getMapNames()); + Collections.sort(expectedNames); + ArrayList actualNames = Collections.list(receivedMapMessage.getMapNames()); + Collections.sort(actualNames); + assertEquals("Map message keys not as expected", expectedNames, actualNames); + + // Stream Message + + StreamMessage streamMessage = producerSession.createStreamMessage(); + streamMessage.writeString(STRING_TEST_VAL); + streamMessage.writeShort(SHORT_TEST_VAL); + streamMessage.writeLong(LONG_TEST_VAL); + streamMessage.writeInt(INT_TEST_VAL); + streamMessage.writeFloat(FLOAT_TEST_VAL); + streamMessage.writeDouble(DOUBLE_TEST_VAL); + streamMessage.writeChar(CHAR_TEST_VAL); + streamMessage.writeBytes(BYTES_TEST_VAL); + streamMessage.writeByte(BYTE_TEST_VAL); + streamMessage.writeBoolean(BOOLEAN_TEST_VAL); + + producer.send(streamMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected stream message message did not arrive", m); + assertTrue("Received message not an instance of StreamMessage (" + m.getClass().getName() + " instead)", m instanceof StreamMessage); + StreamMessage receivedStreamMessage = (StreamMessage) m; + + assertEquals("Stream message read string not as expected", STRING_TEST_VAL, receivedStreamMessage.readString()); + assertEquals("Stream message read short not as expected", SHORT_TEST_VAL, receivedStreamMessage.readShort()); + assertEquals("Stream message read long not as expected", LONG_TEST_VAL, receivedStreamMessage.readLong()); + assertEquals("Stream message read int not as expected", INT_TEST_VAL, receivedStreamMessage.readInt()); + assertEquals("Stream message read float not as expected", FLOAT_TEST_VAL, receivedStreamMessage.readFloat()); + assertEquals("Stream message read double not as expected", DOUBLE_TEST_VAL, receivedStreamMessage.readDouble()); + assertEquals("Stream message read char not as expected", CHAR_TEST_VAL, receivedStreamMessage.readChar()); + byte[] bytesVal = new byte[BYTES_TEST_VAL.length]; + receivedStreamMessage.readBytes(bytesVal); + assertTrue("Stream message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal)); + assertEquals("Stream message read byte not as expected", BYTE_TEST_VAL, receivedStreamMessage.readByte()); + assertEquals("Stream message read boolean not as expected", BOOLEAN_TEST_VAL, receivedStreamMessage.readBoolean()); + + try + { + receivedStreamMessage.readByte(); + fail("Unexpected remaining bytes in stream message"); + } + catch(MessageEOFException e) + { + // pass + } + + // Object Message + + ObjectMessage objectMessage = producerSession.createObjectMessage(); + objectMessage.setObject(STRING_TEST_VAL); + + producer.send(objectMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected object message message did not arrive", m); + assertTrue("Received message not an instance of ObjectMessage (" + m.getClass().getName() + " instead)", m instanceof ObjectMessage); + ObjectMessage receivedObjectMessage = (ObjectMessage) m; + assertEquals("Object message value not as expected", STRING_TEST_VAL, receivedObjectMessage.getObject()); + + + // Bytes Message + + BytesMessage bytesMessage = producerSession.createBytesMessage(); + bytesMessage.writeBytes(BYTES_TEST_VAL); + + producer.send(bytesMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected bytes message message did not arrive", m); + assertTrue("Received message not an instance of BytesMessage (" + m.getClass().getName() + " instead)", m instanceof BytesMessage); + BytesMessage receivedBytesMessage = (BytesMessage) m; + bytesVal = new byte[BYTES_TEST_VAL.length]; + receivedBytesMessage.readBytes(bytesVal); + assertTrue("Bytes message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal)); + + try + { + receivedBytesMessage.readByte(); + fail("Unexpected remaining bytes in stream message"); + } + catch(MessageEOFException e) + { + // pass + } + + // Headers / properties tests + + Message msg = producerSession.createMessage(); + msg.setJMSCorrelationID("testCorrelationId"); + msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + msg.setJMSPriority(7); + msg.setJMSType("testType"); + + msg.setBooleanProperty("boolean", BOOLEAN_TEST_VAL); + msg.setByteProperty("byte", BYTE_TEST_VAL); + msg.setDoubleProperty("double", DOUBLE_TEST_VAL); + msg.setFloatProperty("float", FLOAT_TEST_VAL); + msg.setIntProperty("int", INT_TEST_VAL); + msg.setLongProperty("long", LONG_TEST_VAL); + msg.setShortProperty("short", SHORT_TEST_VAL); + msg.setStringProperty("string", STRING_TEST_VAL); + + producer.send(msg); + + m = consumer.receive(TIMEOUT); + assertNotNull("Expected message did not arrive", m); + assertEquals("JMSMessageID differs", msg.getJMSMessageID(), m.getJMSMessageID()); + assertEquals("JMSCorrelationID differs",msg.getJMSCorrelationID(),m.getJMSCorrelationID()); + assertEquals("JMSDeliveryMode differs",msg.getJMSDeliveryMode(),m.getJMSDeliveryMode()); + assertEquals("JMSPriority differs",msg.getJMSPriority(),m.getJMSPriority()); + assertEquals("JMSType differs",msg.getJMSType(),m.getJMSType()); + + assertEquals("Message boolean property not as expected", BOOLEAN_TEST_VAL, m.getBooleanProperty("boolean")); + assertEquals("Message byte property not as expected", BYTE_TEST_VAL, m.getByteProperty("byte")); + assertEquals("Message double property not as expected", DOUBLE_TEST_VAL, m.getDoubleProperty("double")); + assertEquals("Message float property not as expected", FLOAT_TEST_VAL, m.getFloatProperty("float")); + assertEquals("Message int property not as expected", INT_TEST_VAL, m.getIntProperty("int")); + assertEquals("Message long property not as expected", LONG_TEST_VAL, m.getLongProperty("long")); + assertEquals("Message short property not as expected", SHORT_TEST_VAL, m.getShortProperty("short")); + assertEquals("Message string property not as expected", STRING_TEST_VAL, m.getStringProperty("string")); + + ArrayList<String> sentPropNames = Collections.list(msg.getPropertyNames()); + Collections.sort(sentPropNames); + ArrayList<String> receivedPropNames = Collections.list(m.getPropertyNames()); + Collections.sort(receivedPropNames); + + // Shouldn't really need to do this, the client should be hiding these from us + removeSyntheticProperties(sentPropNames); + removeSyntheticProperties(receivedPropNames); + + assertEquals("Property names were not as expected", sentPropNames, receivedPropNames); + + // Test Reply To Queue + + Destination replyToDestination = producerSession.createTemporaryQueue(); + MessageConsumer replyToConsumer = producerSession.createConsumer(replyToDestination); + msg = producerSession.createMessage(); + msg.setJMSReplyTo(replyToDestination); + producer.send(msg); + + m = consumer.receive(TIMEOUT); + assertNotNull("Expected message did not arrive", m); + assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo()); + + MessageProducer responseProducer = consumerSession.createProducer(m.getJMSReplyTo()); + responseProducer.send(consumerSession.createMessage()); + + assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT)); + + // Test Reply To Topic + + replyToDestination = producerSession.createTemporaryTopic(); + replyToConsumer = producerSession.createConsumer(replyToDestination); + msg = producerSession.createMessage(); + msg.setJMSReplyTo(replyToDestination); + producer.send(msg); + + m = consumer.receive(TIMEOUT); + assertNotNull("Expected message did not arrive", m); + assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo()); + + responseProducer = consumerSession.createProducer(m.getJMSReplyTo()); + responseProducer.send(consumerSession.createMessage()); + + assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT)); + + + } + + private void removeSyntheticProperties(ArrayList<String> propNames) + { + Iterator<String> nameIter = propNames.iterator(); + while(nameIter.hasNext()) + { + String propName = nameIter.next(); + if(propName.startsWith("x-jms") || propName.startsWith("JMS_QPID")) + { + nameIter.remove(); + } + } + } + + @Override + public void tearDown() throws Exception + { + _connection_0_9_1.close(); + _connection_0_10.close(); + super.tearDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java index 08a932eba1..11e1da18b3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java @@ -147,7 +147,6 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase assertNotNull("Consumer 2 should have received first message", cs2Received); - cs1Received.acknowledge(); cs2Received.acknowledge(); Message cs2Received2 = consumer2.receive(1000); @@ -156,6 +155,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase assertEquals("Differing groups", cs2Received2.getStringProperty("group"), cs2Received.getStringProperty("group")); + cs1Received.acknowledge(); Message cs1Received2 = consumer1.receive(1000); assertNotNull("Consumer 1 should have received second message", cs1Received2); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java index bf4dbcb19f..181fe9d34e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java @@ -20,10 +20,11 @@ package org.apache.qpid.server.queue; import java.util.Arrays; -import java.util.Calendar; import java.util.HashMap; -import java.util.Locale; import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -128,11 +129,10 @@ public class SortedQueueTest extends QpidBrokerTestCase final TestConsumerThread consumerThread = new TestConsumerThread(sessionMode, queue); consumerThread.start(); - final Calendar cal = Calendar.getInstance(Locale.UK); for(String value : VALUES) { - final Message msg = _producerSession.createTextMessage(String.valueOf(cal.getTimeInMillis())); + final Message msg = _producerSession.createMessage(); msg.setStringProperty(TEST_SORT_KEY, value); producer.send(msg); _producerSession.commit(); @@ -273,22 +273,22 @@ public class SortedQueueTest extends QpidBrokerTestCase _consumerConnection.start(); //Receive 3 in sorted order - received = receiveAndValidateMessage(consumer, "1"); + received = assertReceiveAndValidateMessage(consumer, "1"); received.acknowledge(); - received = receiveAndValidateMessage(consumer, "2"); + received = assertReceiveAndValidateMessage(consumer, "2"); received.acknowledge(); - received = receiveAndValidateMessage(consumer, "3"); + received = assertReceiveAndValidateMessage(consumer, "3"); received.acknowledge(); //Send 1 sendAndCommitMessage(producer,"4"); //Receive 1 and recover - received = receiveAndValidateMessage(consumer, "4"); + received = assertReceiveAndValidateMessage(consumer, "4"); consumerSession.recover(); //Receive same 1 - received = receiveAndValidateMessage(consumer, "4"); + received = assertReceiveAndValidateMessage(consumer, "4"); received.acknowledge(); //Send 3 out of order @@ -296,33 +296,39 @@ public class SortedQueueTest extends QpidBrokerTestCase sendAndCommitMessage(producer,"6"); sendAndCommitMessage(producer,"5"); - //Receive 1 of 3 (out of order due to pre-fetch) and recover - received = receiveAndValidateMessage(consumer, "7"); + //Receive 1 of 3 (possibly out of order due to pre-fetch) + final Message messageBeforeRollback = assertReceiveMessage(consumer); consumerSession.recover(); if (isBroker010()) { //Receive 3 in sorted order (not as per JMS recover) - received = receiveAndValidateMessage(consumer, "5"); + received = assertReceiveAndValidateMessage(consumer, "5"); received.acknowledge(); - received = receiveAndValidateMessage(consumer, "6"); + received = assertReceiveAndValidateMessage(consumer, "6"); received.acknowledge(); - received = receiveAndValidateMessage(consumer, "7"); + received = assertReceiveAndValidateMessage(consumer, "7"); received.acknowledge(); } else { - //Receive 3 in partial sorted order due to recover - received = receiveAndValidateMessage(consumer, "7"); + //First message will be the one rolled-back (as per JMS spec). + final String messageKeyDeliveredBeforeRollback = messageBeforeRollback.getStringProperty(TEST_SORT_KEY); + received = assertReceiveAndValidateMessage(consumer, messageKeyDeliveredBeforeRollback); received.acknowledge(); - received = receiveAndValidateMessage(consumer, "5"); + + //Remaining two messages will be sorted + final SortedSet<String> keys = new TreeSet<String>(Arrays.asList("5", "6", "7")); + keys.remove(messageKeyDeliveredBeforeRollback); + + received = assertReceiveAndValidateMessage(consumer, keys.first()); received.acknowledge(); - received = receiveAndValidateMessage(consumer, "6"); + received = assertReceiveAndValidateMessage(consumer, keys.last()); received.acknowledge(); } } - protected Queue createQueue() throws AMQException, JMSException + private Queue createQueue() throws AMQException, JMSException { final Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put(AMQQueueFactory.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY); @@ -345,15 +351,22 @@ public class SortedQueueTest extends QpidBrokerTestCase _producerSession.commit(); } - private Message receiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException + private Message assertReceiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException { - final Message received = (TextMessage) consumer.receive(10000); - assertNotNull("Received message is unexpectedly null", received); + final Message received = assertReceiveMessage(consumer); assertEquals("Received message with unexpected sorted key value", expectedKey, received.getStringProperty(TEST_SORT_KEY)); return received; } + private Message assertReceiveMessage(final MessageConsumer consumer) + throws JMSException + { + final Message received = (TextMessage) consumer.receive(10000); + assertNotNull("Received message is unexpectedly null", received); + return received; + } + private class TestConsumerThread extends Thread { private boolean _stopped = false; @@ -388,9 +401,8 @@ public class SortedQueueTest extends QpidBrokerTestCase conn.start(); - TextMessage msg; - Calendar cal = Calendar.getInstance(Locale.UK); - while((msg = (TextMessage) consumer.receive(1000)) != null) + Message msg; + while((msg = consumer.receive(1000)) != null) { if(_sessionType == Session.SESSION_TRANSACTED) { @@ -427,9 +439,7 @@ public class SortedQueueTest extends QpidBrokerTestCase } _count++; - LOGGER.debug("Message consumed at : " + cal.getTimeInMillis()); LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY)); - LOGGER.debug("Message consumed with text: " + msg.getText()); LOGGER.debug("Message consumed with consumed index: " + _consumed); } @@ -439,8 +449,8 @@ public class SortedQueueTest extends QpidBrokerTestCase } catch(JMSException e) { - e.printStackTrace(); - } + LOGGER.error("Exception in listener", e); + } } public synchronized boolean isStopped() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index 12039caf25..e6461c8267 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -70,6 +70,11 @@ public class AcknowledgeTest extends QpidBrokerTestCase // These should all end up being prefetched by session sendMessage(_consumerSession, _queue, 1); + if(!transacted) + { + ((AMQSession)_consumerSession).sync(); + } + assertEquals("Wrong number of messages on queue", 1, ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java deleted file mode 100644 index 474a425b28..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TopicSession; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQConnectionTest extends QpidBrokerTestCase -{ - protected static AMQConnection _connection; - protected static AMQTopic _topic; - protected static AMQQueue _queue; - private static QueueSession _queueSession; - private static TopicSession _topicSession; - protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class); - - protected void setUp() throws Exception - { - super.setUp(); - createConnection(); - _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic")); - _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue")); - } - - @Override - protected void tearDown() throws Exception - { - _connection.close(); - super.tearDown(); //To change body of overridden methods use File | Settings | File Templates. - } - - protected void createConnection() throws Exception - { - _connection = (AMQConnection) getConnection("guest", "guest"); - } - - /** - * Simple tests to check we can create TopicSession and QueueSession ok - * And that they throw exceptions where appropriate as per JMS spec - */ - - public void testCreateQueueSession() throws JMSException - { - createQueueSession(); - } - - private void createQueueSession() throws JMSException - { - _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE); - } - - public void testCreateTopicSession() throws JMSException - { - createTopicSession(); - } - - private void createTopicSession() throws JMSException - { - _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - } - - public void testTopicSessionCreateBrowser() throws JMSException - { - createTopicSession(); - try - { - _topicSession.createBrowser(_queue); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testTopicSessionCreateQueue() throws JMSException - { - createTopicSession(); - try - { - _topicSession.createQueue("abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testTopicSessionCreateTemporaryQueue() throws JMSException - { - createTopicSession(); - try - { - _topicSession.createTemporaryQueue(); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionCreateTemporaryTopic() throws JMSException - { - createQueueSession(); - try - { - _queueSession.createTemporaryTopic(); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionCreateTopic() throws JMSException - { - createQueueSession(); - try - { - _queueSession.createTopic("abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionDurableSubscriber() throws JMSException - { - createQueueSession(); - try - { - _queueSession.createDurableSubscriber(_topic, "abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionUnsubscribe() throws JMSException - { - createQueueSession(); - try - { - _queueSession.unsubscribe("abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testPrefetchSystemProperty() throws Exception - { - _connection.close(); - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); - - createConnection(); - _connection.start(); - // Create two consumers on different sessions - Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumerA = consSessA.createConsumer(_queue); - - Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(_queue); - - // Send 3 messages - for (int i = 0; i < 3; i++) - { - producer.send(producerSession.createTextMessage("test")); - } - producerSession.commit(); - - MessageConsumer consumerB = null; - // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. - if (!isBroker010()) - { - Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - consumerB = consSessB.createConsumer(_queue); - } - else - { - consumerB = consSessA.createConsumer(_queue); - } - - Message msg; - // Check that consumer A has 2 messages - for (int i = 0; i < 2; i++) - { - msg = consumerA.receive(1500); - assertNotNull("Consumer A should receive 2 messages",msg); - } - - msg = consumerA.receive(1500); - assertNull("Consumer A should not have received a 3rd message",msg); - - // Check that consumer B has the last message - msg = consumerB.receive(1500); - assertNotNull("Consumer B should have received the message",msg); - } - - - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java deleted file mode 100644 index 5f3daa407a..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.unit.client; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; - -public class AMQSSLConnectionTest extends AMQConnectionTest -{ - private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks"; - private static final String KEYSTORE_PASSWORD = "password"; - private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks"; - private static final String TRUSTSTORE_PASSWORD = "password"; - - @Override - protected void setUp() throws Exception - { - setTestClientSystemProperty("profile.use_ssl", "true"); - setConfigurationProperty("connector.ssl.enabled", "true"); - setConfigurationProperty("connector.ssl.sslOnly", "true"); - super.setUp(); - } - - protected void createConnection() throws Exception - { - - final String sslPrototypeUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + - "?ssl='true'&ssl_verify_hostname='false'" + - "&key_store='%s'&key_store_password='%s'" + - "&trust_store='%s'&trust_store_password='%s'" + - "'"; - - final String url = String.format(sslPrototypeUrl,System.getProperty("test.port.ssl"), - KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD); - - _connection = (AMQConnection) getConnection(new AMQConnectionURL(url)); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index 93cceb1048..c33dde53b7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -101,10 +101,4 @@ public class AMQSessionTest extends QpidBrokerTestCase assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName()); } - public static void stopVmBrokers() - { - _queue = null; - _topic = null; - _session = null; - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java new file mode 100644 index 0000000000..ef90ab8ffe --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Ensures that queue specific session factory method {@link QueueConnection#createQueueSession()} create sessions + * of type {@link QueueSession} and that those sessions correctly restrict the available JMS operations + * operations to exclude those applicable to only topics. + * + * @see TopicSessionFactoryTest + */ +public class QueueSessionFactoryTest extends QpidBrokerTestCase +{ + public void testQueueSessionIsNotATopicSession() throws Exception + { + QueueSession queueSession = getQueueSession(); + assertFalse(queueSession instanceof TopicSession); + } + + public void testQueueSessionCannotCreateTemporaryTopics() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.createTemporaryTopic(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTemporaryTopic from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannotCreateTopics() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.createTopic("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTopic from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannotCreateDurableSubscriber() throws Exception + { + QueueSession queueSession = getQueueSession(); + Topic topic = getTestTopic(); + + try + { + queueSession.createDurableSubscriber(topic, "abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createDurableSubscriber from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannoutUnsubscribe() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.unsubscribe("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call unsubscribe from QueueSession", s.getMessage()); + } + } + + private QueueSession getQueueSession() throws Exception + { + QueueConnection queueConnection = (QueueConnection)getConnection(); + return queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java new file mode 100644 index 0000000000..6397f15e0a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Ensures that topic specific session factory method {@link TopicConnection#createTopicSession()} create sessions + * of type {@link TopicSession} and that those sessions correctly restrict the available JMS operations + * operations to exclude those applicable to only queues. + * + * @see QueueSessionFactoryTest + */ +public class TopicSessionFactoryTest extends QpidBrokerTestCase +{ + public void testTopicSessionIsNotAQueueSession() throws Exception + { + TopicSession topicSession = getTopicSession(); + assertFalse(topicSession instanceof QueueSession); + } + + public void testTopicSessionCannotCreateCreateBrowser() throws Exception + { + TopicSession topicSession = getTopicSession(); + Queue queue = getTestQueue(); + try + { + topicSession.createBrowser(queue); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createBrowser from TopicSession", s.getMessage()); + } + } + + public void testTopicSessionCannotCreateQueues() throws Exception + { + TopicSession topicSession = getTopicSession(); + try + { + topicSession.createQueue("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createQueue from TopicSession", s.getMessage()); + } + } + + public void testTopicSessionCannotCreateTemporaryQueues() throws Exception + { + TopicSession topicSession = getTopicSession(); + try + { + topicSession.createTemporaryQueue(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTemporaryQueue from TopicSession", s.getMessage()); + } + } + + private TopicSession getTopicSession() throws Exception + { + TopicConnection topicConnection = (TopicConnection)getConnection(); + return topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index d3d9cf2984..e948aaffb3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -721,7 +721,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); - + ((AMQSession)session).sync(); // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index f680a20288..9a8da14f83 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -46,6 +46,7 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -56,6 +57,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; @@ -1101,6 +1103,15 @@ public class QpidBrokerTestCase extends QpidTestCase return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName()); } + /** + * Return a Topic specific for this test. + * Uses getTestQueueName() as the name of the topic + * @return + */ + public Topic getTestTopic() + { + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName()); + } protected void tearDown() throws java.lang.Exception { diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index d8c463b810..015aa19a8e 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -23,10 +23,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateEx // QPID-3576: Java client issue. MessageConsumer#close() time-out. org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testDeleteOptions -//This test requires SSL, but SSL is only enabled for the C++ broker in the cpp.ssl test profile -//which runs *all* the tests with SSL, so this one can be excluded safely enough -org.apache.qpid.test.unit.client.AMQSSLConnectionTest#* - org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* org.apache.qpid.client.ResetMessageListenerTest#* @@ -178,3 +174,5 @@ org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseGroupAssignm org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseWithRelease org.apache.qpid.server.queue.MessageGroupQueueTest#testGroupAssignmentSurvivesEmpty +// CPP Broker does not implement message conversion from 0-9-1 +org.apache.qpid.server.message.MessageProtocolConversionTest#* diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 057d7c2c44..ba96232fe3 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -21,8 +21,10 @@ //Exclude the following from brokers using the 0-8/0-9/0-9-1 protocols //====================================================================== -// This test requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently +// These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend +org.apache.qpid.server.message.MessageProtocolConversionTest#* + // QPID-2478 test fails when run against broker using 0-8/9 org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange |