summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-14 20:24:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-14 20:24:57 +0000
commita5ab5bb5e7d6961997e4dbc9489e77ef0f6f1c98 (patch)
tree94ebe90baf2bac2d8c43613fd40c7db1be3f5814
parent858ddcc441ca47636a710d93f5084146ce73476c (diff)
downloadqpid-python-a5ab5bb5e7d6961997e4dbc9489e77ef0f6f1c98.tar.gz
Reapplying 1-0 sandbox changes to correct branch (hopefully)
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157654 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java40
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java247
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java28
12 files changed, 245 insertions, 150 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 71cf17ed60..41aa22b8ef 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.logging.*;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -77,6 +78,8 @@ public class Main
private static final int IPV4_ADDRESS_LENGTH = 4;
private static final char IPV4_LITERAL_SEPARATOR = '.';
+ private java.util.logging.Logger FRAME_LOGGER;
+ private java.util.logging.Logger RAW_LOGGER;
protected static class InitException extends Exception
{
@@ -249,6 +252,10 @@ public class Main
protected void startup() throws Exception
{
+
+ FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log");
+ RAW_LOGGER = updateLogger("RAW", "qpid-raw.log");
+
final String QpidHome = System.getProperty(QPID_HOME);
final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
@@ -449,6 +456,39 @@ public class Main
}
+ private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException
+ {
+ java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType);
+ logger.setLevel(Level.FINE);
+ Formatter formatter = new Formatter()
+ {
+ @Override
+ public String format(final LogRecord record)
+ {
+
+ return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n";
+ }
+ };
+ for(Handler handler : logger.getHandlers())
+ {
+ logger.removeHandler(handler);
+ }
+ Handler handler = new ConsoleHandler();
+
+ handler.setLevel(Level.FINE);
+ handler.setFormatter(formatter);
+
+ logger.addHandler(handler);
+
+
+ handler = new FileHandler(logFileName, true);
+ handler.setLevel(Level.FINE);
+ handler.setFormatter(formatter);
+
+ logger.addHandler(handler);
+ return logger;
+ }
+
private void parsePortArray(Set<Integer> ports, String[] portStr)
throws InitException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 75358c42d9..500a34b4a8 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -35,7 +35,7 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
;
- public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
+ public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 };
private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b003152db6..3a89194eb9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -188,7 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
private ConfigurationPlugin _queueConfiguration;
-
+ private final boolean _isTopic;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
@@ -234,10 +234,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments;
+ _arguments = arguments == null ? Collections.EMPTY_MAP : arguments;
_id = virtualHost.getConfigStore().createId();
+ _isTopic = arguments != null && arguments.containsKey("topic");
+
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -329,7 +331,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _exclusive;
}
-
+
public void setExclusive(boolean exclusive) throws AMQException
{
_exclusive = exclusive;
@@ -404,8 +406,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
throw new AMQSecurityException("Permission denied");
}
-
-
+
+
if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
@@ -435,14 +437,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
subscription.setNoLocal(_nolocal);
}
_subscriptionList.add(subscription);
-
+
//Increment consumerCountHigh if necessary. (un)registerSubscription are both
//synchronized methods so we don't need additional synchronization here
if(_counsumerCountHigh.get() < getConsumerCount())
{
_counsumerCountHigh.incrementAndGet();
}
-
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -488,6 +490,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// queue. This is because the delete method uses the subscription set which has just been cleared
subscription.queueDeleted(this);
}
+
+ if(_subscriptionList.size() == 0 && _isTopic)
+ {
+ clearQueue();
+ }
}
}
@@ -514,10 +521,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
break;
}
}
-
+
reconfigure();
}
-
+
private void reconfigure()
{
//Reconfigure the queue for to reflect this new binding.
@@ -543,7 +550,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
+
reconfigure();
}
@@ -570,101 +577,104 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
- incrementTxnEnqueueStats(message);
- incrementQueueCount();
- incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
-
- if (exclusiveSub != null)
+ if(!_isTopic || _subscriptionList.size()!=0)
{
- exclusiveSub.getSendLock();
+ incrementTxnEnqueueStats(message);
+ incrementQueueCount();
+ incrementQueueSize(message);
- try
- {
- entry = _entries.add(message);
+ QueueEntry entry;
- deliverToSubscription(exclusiveSub, entry);
- }
- finally
+ if (exclusiveSub != null)
{
- exclusiveSub.releaseSendLock();
- }
- }
- else
- {
- entry = _entries.add(message);
- /*
-
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ exclusiveSub.getSendLock();
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- while (nextNode != null)
- {
- if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+ try
{
- break;
+ entry = _entries.add(message);
+
+ deliverToSubscription(exclusiveSub, entry);
}
- else
+ finally
{
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
+ exclusiveSub.releaseSendLock();
}
}
+ else
+ {
+ entry = _entries.add(message);
+ /*
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
- {
+ */
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if (nextNode == null)
{
- loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _subscriptionList.getHead().getNext();
}
- else
+ while (nextNode != null)
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+ {
+ break;
+ }
+ else
+ {
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
+ if (nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().getNext();
+ }
+ }
}
- nextNode = nextNode.getNext();
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
+
+ while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+ {
+ if (nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
+ }
+ nextNode = nextNode.getNext();
+
+ }
}
- }
- if (!(entry.isAcquired() || entry.isDeleted()))
- {
- checkSubscriptionsNotAheadOfDelivery(entry);
+ if (!(entry.isAcquired() || entry.isDeleted()))
+ {
+ checkSubscriptionsNotAheadOfDelivery(entry);
- deliverAsync();
- }
+ deliverAsync();
+ }
- if(_managedObject != null)
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
+ if(_managedObject != null)
+ {
+ _managedObject.checkForNotification(entry.getMessage());
+ }
- if(action != null)
- {
- action.onEnqueue(entry);
+ if(action != null)
+ {
+ action.onEnqueue(entry);
+ }
}
-
}
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -720,20 +730,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
getAtomicQueueCount().incrementAndGet();
}
-
+
private void incrementTxnEnqueueStats(final ServerMessage message)
{
SessionConfig session = message.getSessionConfig();
-
+
if(session !=null && session.isTransactional())
{
_msgTxnEnqueues.incrementAndGet();
_byteTxnEnqueues.addAndGet(message.getSize());
}
}
-
+
private void incrementTxnDequeueStats(QueueEntry entry)
- {
+ {
_msgTxnDequeues.incrementAndGet();
_byteTxnDequeues.addAndGet(entry.getSize());
}
@@ -747,6 +757,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
incrementUnackedMsgCount();
sub.send(entry);
+
+ if(_isTopic)
+ {
+ if(allSubscriptionsAhead(entry) && entry.acquire())
+ {
+ entry.discard();
+ }
+ }
+ }
+
+ private boolean allSubscriptionsAhead(final QueueEntry entry)
+ {
+ SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ while(subIter.advance() && !entry.isAcquired())
+ {
+ final Subscription subscription = subIter.getNode().getSubscription();
+ if(!subscription.isClosed())
+ {
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry subnode = context._lastSeenEntry;
+ if(subnode.compareTo(entry)<0)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ return true;
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -831,7 +875,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_deliveredMessages.decrementAndGet();
}
-
+
if(sub != null && sub.isSessionTransactional())
{
incrementTxnDequeueStats(entry);
@@ -888,7 +932,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _subscriptionList.size();
}
-
+
public int getConsumerCountHigh()
{
return _counsumerCountHigh.get();
@@ -1298,7 +1342,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
public long clearQueue() throws AMQException
- {
+ {
return clear(0l);
}
@@ -1309,7 +1353,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
throw new AMQSecurityException("Permission denied: queue " + getName());
}
-
+
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
@@ -1376,7 +1420,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
throw new AMQSecurityException("Permission denied: " + getName());
}
-
+
if (!_deleted.getAndSet(true))
{
@@ -1670,12 +1714,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
sub.getSendLock();
atTail = attemptDelivery(sub);
- if (atTail && sub.isAutoClose())
+ if (atTail && getNextAvailableEntry(sub) == null)
{
- unregisterSubscription(sub);
-
- sub.confirmAutoClose();
-
+ sub.queueEmpty();
}
else if (!atTail)
{
@@ -1696,6 +1737,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
advanceAllSubscriptions();
}
+
return atTail;
}
@@ -1854,13 +1896,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (extraLoops == 0)
{
- deliveryIncomplete = false;
- if (sub.isAutoClose())
+ if(getNextAvailableEntry(sub) == null)
{
- unregisterSubscription(sub);
-
- sub.confirmAutoClose();
+ sub.queueEmpty();
}
+ deliveryIncomplete = false;
+
}
else
{
@@ -2166,22 +2207,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _dequeueSize.get();
}
-
+
public long getByteTxnEnqueues()
{
return _byteTxnEnqueues.get();
}
-
+
public long getByteTxnDequeues()
{
return _byteTxnDequeues.get();
}
-
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
-
+
public long getMsgTxnDequeues()
{
return _msgTxnDequeues.get();
@@ -2218,21 +2259,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _unackedMsgCountHigh.get();
}
-
+
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
-
+
public void decrementUnackedMsgCount()
{
_unackedMsgCount.decrementAndGet();
}
-
+
private void incrementUnackedMsgCount()
{
long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+
long unackedMsgCountHigh;
while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
index bc771162fd..15651b088b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
@@ -23,14 +23,17 @@ package org.apache.qpid.server.security.auth.manager;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-public interface AuthenticationManager extends Closeable
+public interface AuthenticationManager extends Closeable, CallbackHanderSource
{
String getMechanisms();
SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
AuthenticationResult authenticate(SaslServer server, byte[] response);
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index 2a967f02af..bbd90b4d53 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -234,4 +234,9 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
{
Security.removeProvider(PROVIDER_NAME);
}
+
+ public CallbackHandler getHandler(String mechanism)
+ {
+ return _callbackHandlerMap.get(mechanism);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
index 67d20136bf..eb463ee722 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
@@ -45,9 +45,9 @@ public class AmqPlainSaslServerFactory implements SaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE))
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
index 6032255870..5706cbf49e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
@@ -47,10 +47,10 @@ public class AnonymousSaslServerFactory implements SaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
props.containsKey(Sasl.POLICY_NODICTIONARY) ||
props.containsKey(Sasl.POLICY_NOACTIVE) ||
- props.containsKey(Sasl.POLICY_NOANONYMOUS))
+ props.containsKey(Sasl.POLICY_NOANONYMOUS)))
{
// returned array must be non null according to interface documentation
return new String[0];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
index f0dd9eeb6d..11b0f26e05 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
@@ -45,9 +45,9 @@ public class PlainSaslServerFactory implements SaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE))
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 540ad3fffd..c4e2f1a322 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -106,7 +106,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -138,7 +138,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
{
super(connection, delegate, name, expiry);
- _connectionConfig = connConfig;
+ _connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
_principal = new UserPrincipal(connection.getAuthorizationID());
_reference = new WeakReference(this);
@@ -331,7 +331,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
}
}
- public void removeDispositionListener(Method method)
+ public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
@@ -351,7 +351,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
{
task.doTask(this);
}
-
+
CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
}
@@ -396,7 +396,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public void unregister(Subscription_0_10 sub)
{
- _subscriptions.remove(sub.getConsumerTag().toString());
+ _subscriptions.remove(sub.getName());
try
{
sub.getSendLock();
@@ -417,7 +417,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
sub.releaseSendLock();
}
}
-
+
public boolean isTransactional()
{
// this does not look great but there should only be one "non-transactional"
@@ -435,7 +435,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public void commit()
{
_transaction.commit();
-
+
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -444,13 +444,13 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public void rollback()
{
_transaction.rollback();
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -460,7 +460,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -490,7 +490,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
{
return _txnCount.get();
}
-
+
public Principal getPrincipal()
{
return _principal;
@@ -606,7 +606,6 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
return (LogSubject) this;
}
- @Override
public String toLogString()
{
return "[" +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 6ec1c512e5..49678055f9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -20,12 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -36,7 +31,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -63,6 +57,7 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -115,6 +110,7 @@ public class VirtualHostImpl implements VirtualHost
private final long _createTime = System.currentTimeMillis();
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
+ private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
public IConnectionRegistry getConnectionRegistry()
{
@@ -663,6 +659,17 @@ public class VirtualHostImpl implements VirtualHost
}
}
+ public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+ {
+ LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
+ if(linkRegistry == null)
+ {
+ linkRegistry = new LinkRegistry();
+ _linkRegistry.put(remoteContainerId, linkRegistry);
+ }
+ return linkRegistry;
+ }
+
public ConfigStore getConfigStore()
{
return getApplicationRegistry().getConfigStore();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7b58966a4c..8755724cfc 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -100,14 +100,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
return bind(queueName, queueName, getHeadersMap(bindings));
}
-
+
protected void unbind(TestQueue queue, String... bindings) throws AMQException
{
String queueName = queue.getName();
//TODO - check this
exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings)));
}
-
+
protected int getCount()
{
return count;
@@ -120,7 +120,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
exchange.onBind(new Binding(null,key, queue, exchange, args));
return queue;
}
-
+
protected int route(Message m) throws AMQException
{
@@ -175,14 +175,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
}
}
-
+
static Map<String,Object> getHeadersMap(String... entries)
{
if(entries == null)
{
return null;
}
-
+
Map<String,Object> headers = new HashMap<String,Object>();
for (String s : entries)
@@ -438,7 +438,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(Subscription subscription)
+ public void requeue(Subscription subscription)
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 64c62fd029..790511017a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -31,10 +31,10 @@ import org.apache.qpid.server.message.ServerMessage;
/**
* Mock Server Message allowing its persistent flag to be controlled from test.
*/
-class MockServerMessage implements ServerMessage
+class MockServerMessage implements ServerMessage<MockServerMessage>
{
/**
- *
+ *
*/
private final boolean persistent;
@@ -46,67 +46,67 @@ class MockServerMessage implements ServerMessage
this.persistent = persistent;
}
- @Override
+
public boolean isPersistent()
{
return persistent;
}
- @Override
- public MessageReference newReference()
+
+ public MessageReference<MockServerMessage> newReference()
{
throw new NotImplementedException();
}
- @Override
+
public boolean isImmediate()
{
throw new NotImplementedException();
}
- @Override
+
public long getSize()
{
throw new NotImplementedException();
}
- @Override
+
public SessionConfig getSessionConfig()
{
throw new NotImplementedException();
}
- @Override
+
public String getRoutingKey()
{
throw new NotImplementedException();
}
- @Override
+
public AMQMessageHeader getMessageHeader()
{
throw new NotImplementedException();
}
- @Override
+
public long getExpiration()
{
throw new NotImplementedException();
}
- @Override
+
public int getContent(ByteBuffer buf, int offset)
{
throw new NotImplementedException();
}
- @Override
+
public long getArrivalTime()
{
throw new NotImplementedException();
}
- @Override
+
public Long getMessageNumber()
{
return 0L;