From a5ab5bb5e7d6961997e4dbc9489e77ef0f6f1c98 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 14 Aug 2011 20:24:57 +0000 Subject: Reapplying 1-0 sandbox changes to correct branch (hopefully) git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157654 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/Main.java | 40 ++++ .../MultiVersionProtocolEngineFactory.java | 2 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 247 ++++++++++++--------- .../auth/manager/AuthenticationManager.java | 5 +- .../PrincipalDatabaseAuthenticationManager.java | 5 + .../sasl/amqplain/AmqPlainSaslServerFactory.java | 4 +- .../sasl/anonymous/AnonymousSaslServerFactory.java | 4 +- .../auth/sasl/plain/PlainSaslServerFactory.java | 4 +- .../qpid/server/transport/ServerSession.java | 23 +- .../qpid/server/virtualhost/VirtualHostImpl.java | 21 +- .../exchange/AbstractHeadersExchangeTestBase.java | 12 +- .../apache/qpid/server/txn/MockServerMessage.java | 28 +-- 12 files changed, 245 insertions(+), 150 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 71cf17ed60..41aa22b8ef 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.logging.*; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; @@ -77,6 +78,8 @@ public class Main private static final int IPV4_ADDRESS_LENGTH = 4; private static final char IPV4_LITERAL_SEPARATOR = '.'; + private java.util.logging.Logger FRAME_LOGGER; + private java.util.logging.Logger RAW_LOGGER; protected static class InitException extends Exception { @@ -249,6 +252,10 @@ public class Main protected void startup() throws Exception { + + FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log"); + RAW_LOGGER = updateLogger("RAW", "qpid-raw.log"); + final String QpidHome = System.getProperty(QPID_HOME); final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); @@ -449,6 +456,39 @@ public class Main } + private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException + { + java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType); + logger.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + + return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : logger.getHandlers()) + { + logger.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + + logger.addHandler(handler); + + + handler = new FileHandler(logFileName, true); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + + logger.addHandler(handler); + return logger; + } + private void parsePortArray(Set ports, String[] portStr) throws InitException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 75358c42d9..500a34b4a8 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -35,7 +35,7 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory ; - public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; + public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 }; private static final Set ALL_VERSIONS = new HashSet(Arrays.asList(VERSION.values())); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b003152db6..3a89194eb9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -188,7 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //TODO : persist creation time private long _createTime = System.currentTimeMillis(); private ConfigurationPlugin _queueConfiguration; - + private final boolean _isTopic; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map arguments) @@ -234,10 +234,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments; + _arguments = arguments == null ? Collections.EMPTY_MAP : arguments; _id = virtualHost.getConfigStore().createId(); + _isTopic = arguments != null && arguments.containsKey("topic"); + _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); @@ -329,7 +331,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _exclusive; } - + public void setExclusive(boolean exclusive) throws AMQException { _exclusive = exclusive; @@ -404,8 +406,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied"); } - - + + if (hasExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); @@ -435,14 +437,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener subscription.setNoLocal(_nolocal); } _subscriptionList.add(subscription); - + //Increment consumerCountHigh if necessary. (un)registerSubscription are both //synchronized methods so we don't need additional synchronization here if(_counsumerCountHigh.get() < getConsumerCount()) { _counsumerCountHigh.incrementAndGet(); } - + if (isDeleted()) { subscription.queueDeleted(this); @@ -488,6 +490,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // queue. This is because the delete method uses the subscription set which has just been cleared subscription.queueDeleted(this); } + + if(_subscriptionList.size() == 0 && _isTopic) + { + clearQueue(); + } } } @@ -514,10 +521,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener break; } } - + reconfigure(); } - + private void reconfigure() { //Reconfigure the queue for to reflect this new binding. @@ -543,7 +550,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void removeBinding(final Binding binding) { _bindings.remove(binding); - + reconfigure(); } @@ -570,101 +577,104 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { - incrementTxnEnqueueStats(message); - incrementQueueCount(); - incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); - QueueEntry entry; Subscription exclusiveSub = _exclusiveSubscriber; - - if (exclusiveSub != null) + if(!_isTopic || _subscriptionList.size()!=0) { - exclusiveSub.getSendLock(); + incrementTxnEnqueueStats(message); + incrementQueueCount(); + incrementQueueSize(message); - try - { - entry = _entries.add(message); + QueueEntry entry; - deliverToSubscription(exclusiveSub, entry); - } - finally + if (exclusiveSub != null) { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); - /* - - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + exclusiveSub.getSendLock(); - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - while (nextNode != null) - { - if (_lastSubscriptionNode.compareAndSet(node, nextNode)) + try { - break; + entry = _entries.add(message); + + deliverToSubscription(exclusiveSub, entry); } - else + finally { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } + exclusiveSub.releaseSendLock(); } } + else + { + entry = _entries.add(message); + /* - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; + iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) - { + */ + SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); + SubscriptionList.SubscriptionNode nextNode = node.getNext(); if (nextNode == null) { - loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _subscriptionList.getHead().getNext(); } - else + while (nextNode != null) { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); + if (_lastSubscriptionNode.compareAndSet(node, nextNode)) + { + break; + } + else + { + node = _lastSubscriptionNode.get(); + nextNode = node.getNext(); + if (nextNode == null) + { + nextNode = _subscriptionList.getHead().getNext(); + } + } } - nextNode = nextNode.getNext(); + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; + + while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) + { + if (nextNode == null) + { + loops--; + nextNode = _subscriptionList.getHead(); + } + else + { + // if subscription at end, and active, offer + Subscription sub = nextNode.getSubscription(); + deliverToSubscription(sub, entry); + } + nextNode = nextNode.getNext(); + + } } - } - if (!(entry.isAcquired() || entry.isDeleted())) - { - checkSubscriptionsNotAheadOfDelivery(entry); + if (!(entry.isAcquired() || entry.isDeleted())) + { + checkSubscriptionsNotAheadOfDelivery(entry); - deliverAsync(); - } + deliverAsync(); + } - if(_managedObject != null) - { - _managedObject.checkForNotification(entry.getMessage()); - } + if(_managedObject != null) + { + _managedObject.checkForNotification(entry.getMessage()); + } - if(action != null) - { - action.onEnqueue(entry); + if(action != null) + { + action.onEnqueue(entry); + } } - } private void deliverToSubscription(final Subscription sub, final QueueEntry entry) @@ -720,20 +730,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { getAtomicQueueCount().incrementAndGet(); } - + private void incrementTxnEnqueueStats(final ServerMessage message) { SessionConfig session = message.getSessionConfig(); - + if(session !=null && session.isTransactional()) { _msgTxnEnqueues.incrementAndGet(); _byteTxnEnqueues.addAndGet(message.getSize()); } } - + private void incrementTxnDequeueStats(QueueEntry entry) - { + { _msgTxnDequeues.incrementAndGet(); _byteTxnDequeues.addAndGet(entry.getSize()); } @@ -747,6 +757,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener incrementUnackedMsgCount(); sub.send(entry); + + if(_isTopic) + { + if(allSubscriptionsAhead(entry) && entry.acquire()) + { + entry.discard(); + } + } + } + + private boolean allSubscriptionsAhead(final QueueEntry entry) + { + SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); + while(subIter.advance() && !entry.isAcquired()) + { + final Subscription subscription = subIter.getNode().getSubscription(); + if(!subscription.isClosed()) + { + QueueContext context = (QueueContext) subscription.getQueueContext(); + if(context != null) + { + QueueEntry subnode = context._lastSeenEntry; + if(subnode.compareTo(entry)<0) + { + return false; + } + } + else + { + return false; + } + } + } + return true; } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException @@ -831,7 +875,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _deliveredMessages.decrementAndGet(); } - + if(sub != null && sub.isSessionTransactional()) { incrementTxnDequeueStats(entry); @@ -888,7 +932,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _subscriptionList.size(); } - + public int getConsumerCountHigh() { return _counsumerCountHigh.get(); @@ -1298,7 +1342,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } public long clearQueue() throws AMQException - { + { return clear(0l); } @@ -1309,7 +1353,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied: queue " + getName()); } - + QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; @@ -1376,7 +1420,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied: " + getName()); } - + if (!_deleted.getAndSet(true)) { @@ -1670,12 +1714,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { sub.getSendLock(); atTail = attemptDelivery(sub); - if (atTail && sub.isAutoClose()) + if (atTail && getNextAvailableEntry(sub) == null) { - unregisterSubscription(sub); - - sub.confirmAutoClose(); - + sub.queueEmpty(); } else if (!atTail) { @@ -1696,6 +1737,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { advanceAllSubscriptions(); } + return atTail; } @@ -1854,13 +1896,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (extraLoops == 0) { - deliveryIncomplete = false; - if (sub.isAutoClose()) + if(getNextAvailableEntry(sub) == null) { - unregisterSubscription(sub); - - sub.confirmAutoClose(); + sub.queueEmpty(); } + deliveryIncomplete = false; + } else { @@ -2166,22 +2207,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _dequeueSize.get(); } - + public long getByteTxnEnqueues() { return _byteTxnEnqueues.get(); } - + public long getByteTxnDequeues() { return _byteTxnDequeues.get(); } - + public long getMsgTxnEnqueues() { return _msgTxnEnqueues.get(); } - + public long getMsgTxnDequeues() { return _msgTxnDequeues.get(); @@ -2218,21 +2259,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _unackedMsgCountHigh.get(); } - + public long getUnackedMessageCount() { return _unackedMsgCount.get(); } - + public void decrementUnackedMsgCount() { _unackedMsgCount.decrementAndGet(); } - + private void incrementUnackedMsgCount() { long unackedMsgCount = _unackedMsgCount.incrementAndGet(); - + long unackedMsgCountHigh; while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java index bc771162fd..15651b088b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java @@ -23,14 +23,17 @@ package org.apache.qpid.server.security.auth.manager; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.security.auth.AuthenticationResult; -public interface AuthenticationManager extends Closeable +public interface AuthenticationManager extends Closeable, CallbackHanderSource { String getMechanisms(); SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException; AuthenticationResult authenticate(SaslServer server, byte[] response); + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 2a967f02af..bbd90b4d53 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -234,4 +234,9 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan { Security.removeProvider(PROVIDER_NAME); } + + public CallbackHandler getHandler(String mechanism) + { + return _callbackHandlerMap.get(mechanism); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java index 67d20136bf..eb463ee722 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java @@ -45,9 +45,9 @@ public class AmqPlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE)) + props.containsKey(Sasl.POLICY_NOACTIVE))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java index 6032255870..5706cbf49e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java @@ -47,10 +47,10 @@ public class AnonymousSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || props.containsKey(Sasl.POLICY_NODICTIONARY) || props.containsKey(Sasl.POLICY_NOACTIVE) || - props.containsKey(Sasl.POLICY_NOANONYMOUS)) + props.containsKey(Sasl.POLICY_NOANONYMOUS))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java index f0dd9eeb6d..11b0f26e05 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java @@ -45,9 +45,9 @@ public class PlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE)) + props.containsKey(Sasl.POLICY_NOACTIVE))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 540ad3fffd..c4e2f1a322 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -106,7 +106,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo new ConcurrentSkipListMap(); private ServerTransaction _transaction; - + private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); @@ -138,7 +138,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) { super(connection, delegate, name, expiry); - _connectionConfig = connConfig; + _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); _principal = new UserPrincipal(connection.getAuthorizationID()); _reference = new WeakReference(this); @@ -331,7 +331,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } - public void removeDispositionListener(Method method) + public void removeDispositionListener(Method method) { _messageDispositionListenerMap.remove(method.getId()); } @@ -351,7 +351,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { task.doTask(this); } - + CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); } @@ -396,7 +396,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void unregister(Subscription_0_10 sub) { - _subscriptions.remove(sub.getConsumerTag().toString()); + _subscriptions.remove(sub.getName()); try { sub.getSendLock(); @@ -417,7 +417,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo sub.releaseSendLock(); } } - + public boolean isTransactional() { // this does not look great but there should only be one "non-transactional" @@ -435,7 +435,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void commit() { _transaction.commit(); - + _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); @@ -444,13 +444,13 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void rollback() { _transaction.rollback(); - + _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -460,7 +460,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo _txnCount.compareAndSet(0,1); } } - + private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -490,7 +490,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { return _txnCount.get(); } - + public Principal getPrincipal() { return _principal; @@ -606,7 +606,6 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo return (LogSubject) this; } - @Override public String toLogString() { return "[" + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 6ec1c512e5..49678055f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,12 +20,7 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TimerTask; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -36,7 +31,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -63,6 +57,7 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -115,6 +110,7 @@ public class VirtualHostImpl implements VirtualHost private final long _createTime = System.currentTimeMillis(); private final ConcurrentHashMap _links = new ConcurrentHashMap(); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; + private final Map _linkRegistry = new HashMap(); public IConnectionRegistry getConnectionRegistry() { @@ -663,6 +659,17 @@ public class VirtualHostImpl implements VirtualHost } } + public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) + { + LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); + if(linkRegistry == null) + { + linkRegistry = new LinkRegistry(); + _linkRegistry.put(remoteContainerId, linkRegistry); + } + return linkRegistry; + } + public ConfigStore getConfigStore() { return getApplicationRegistry().getConfigStore(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7b58966a4c..8755724cfc 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -100,14 +100,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase return bind(queueName, queueName, getHeadersMap(bindings)); } - + protected void unbind(TestQueue queue, String... bindings) throws AMQException { String queueName = queue.getName(); //TODO - check this exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings))); } - + protected int getCount() { return count; @@ -120,7 +120,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase exchange.onBind(new Binding(null,key, queue, exchange, args)); return queue; } - + protected int route(Message m) throws AMQException { @@ -175,14 +175,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase } } - + static Map getHeadersMap(String... entries) { if(entries == null) { return null; } - + Map headers = new HashMap(); for (String s : entries) @@ -438,7 +438,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(Subscription subscription) + public void requeue(Subscription subscription) { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 64c62fd029..790511017a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -31,10 +31,10 @@ import org.apache.qpid.server.message.ServerMessage; /** * Mock Server Message allowing its persistent flag to be controlled from test. */ -class MockServerMessage implements ServerMessage +class MockServerMessage implements ServerMessage { /** - * + * */ private final boolean persistent; @@ -46,67 +46,67 @@ class MockServerMessage implements ServerMessage this.persistent = persistent; } - @Override + public boolean isPersistent() { return persistent; } - @Override - public MessageReference newReference() + + public MessageReference newReference() { throw new NotImplementedException(); } - @Override + public boolean isImmediate() { throw new NotImplementedException(); } - @Override + public long getSize() { throw new NotImplementedException(); } - @Override + public SessionConfig getSessionConfig() { throw new NotImplementedException(); } - @Override + public String getRoutingKey() { throw new NotImplementedException(); } - @Override + public AMQMessageHeader getMessageHeader() { throw new NotImplementedException(); } - @Override + public long getExpiration() { throw new NotImplementedException(); } - @Override + public int getContent(ByteBuffer buf, int offset) { throw new NotImplementedException(); } - @Override + public long getArrivalTime() { throw new NotImplementedException(); } - @Override + public Long getMessageNumber() { return 0L; -- cgit v1.2.1