summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-12 20:19:50 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-12 20:19:50 +0000
commitf0f34d8914a623dc6fe42038ca443ea560a64a28 (patch)
tree30896cdbb2b640aff604d326a79bf933e3c63d18 /java/broker/src
parenteb12fe81f29d3d50598eafd01a4eda1fad6275cb (diff)
downloadqpid-python-f0f34d8914a623dc6fe42038ca443ea560a64a28.tar.gz
More fixing up of refactoring stuff; getting all maven tests passing and implementing management methods
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655630 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java242
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java4
11 files changed, 269 insertions, 53 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index deb6ac8d94..64694c2686 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -495,9 +495,6 @@ public class AMQChannel
// Deliver Message
deliveryContext.requeue(unacked);
- // Should we allow access To the DM to directy deliver the message?
- // As we don't need to check for Consumers or worry about incrementing the message count?
- // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index d8a8cfb6d1..41d7f6c067 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -279,6 +279,12 @@ public class Main
ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
+
+ if(connectorConfig.useBiasedWrites)
+ {
+ System.setProperty("org.apache.qpid.use_write_biased_pool","true");
+ }
+
int port = connectorConfig.port;
String portStr = commandLine.getOptionValue("p");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index cb3aa5259a..db3a05eb52 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -49,6 +49,7 @@ public class TxAck implements TxnOp
public void update(long deliveryTag, boolean multiple)
{
+ _unacked.clear();
if (!multiple)
{
if(_individual == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index e790493a82..1df93dd0d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -278,9 +278,17 @@ public class AMQMessage implements Filterable<AMQException>
}
/** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ public boolean incrementReference()
{
- _referenceCount.incrementAndGet();
+ if(_referenceCount.incrementAndGet() <= 1)
+ {
+ _referenceCount.decrementAndGet();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
// if (_log.isDebugEnabled())
// {
// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
@@ -298,6 +306,7 @@ public class AMQMessage implements Filterable<AMQException>
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
+
int count = _referenceCount.decrementAndGet();
// note that the operation of decrementing the reference count and then removing the message does not
@@ -306,6 +315,11 @@ public class AMQMessage implements Filterable<AMQException>
// not relying on the all the increments having taken place before the delivery manager decrements.
if (count == 0)
{
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
try
{
// if (_log.isDebugEnabled())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 98583c03d4..780cd49834 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -103,10 +103,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void start();
- void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList);
-
-
-
long getMaximumMessageSize();
void setMaximumMessageSize(long value);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index e751212272..431b76754f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -38,7 +38,7 @@ public class AMQQueueFactory
throws AMQException
{
- final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
if(priorities > 1)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 39c28a7355..dd967a7cb1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -34,7 +34,8 @@ public interface QueueEntry extends Comparable<QueueEntry>
AVAILABLE,
ACQUIRED,
EXPIRED,
- DEQUEUED
+ DEQUEUED,
+ DELETED
}
public static interface StateChangeListener
@@ -62,7 +63,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
}
- public final class DeletedState extends EntryState
+ public final class DequeuedState extends EntryState
{
public State getState()
@@ -71,6 +72,16 @@ public interface QueueEntry extends Comparable<QueueEntry>
}
}
+
+ public final class DeletedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DELETED;
+ }
+ }
+
public final class ExpiredState extends EntryState
{
@@ -113,6 +124,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState DEQUEUED_STATE = new DequeuedState();
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
@@ -165,7 +177,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
void restoreCredit();
- void discard(StoreContext storeContext) throws AMQException;
+ void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
boolean isQueueDeleted();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6225501c72..e3338996e1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -252,13 +252,18 @@ public class QueueEntryImpl implements QueueEntry
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
+ EntryState state = _state;
-
- getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+ if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+ getQueue().dequeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ }
+
}
+
}
private void notifyStateChange(final State oldState, final State newState)
@@ -271,8 +276,10 @@ public class QueueEntryImpl implements QueueEntry
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
- getMessage().decrementReference(storeContext);
- delete();
+ if(delete())
+ {
+ getMessage().decrementReference(storeContext);
+ }
}
public void restoreCredit()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7e2f0fc56a..25116817d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -5,6 +5,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -135,9 +136,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
-
-
-
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -428,6 +426,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(entry.immediateAndNotDelivered())
{
dequeue(storeContext, entry);
+ entry.dispose(storeContext);
}
else if(!entry.isAcquired())
{
@@ -582,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId());
}
- entry.delete();
+ //entry.dispose(storeContext);
}
catch (MessageCleanupException e)
@@ -685,7 +684,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public long getOldestMessageArrivalTime()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ QueueEntry entry = getOldestQueueEntry();
+ return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
+ }
+
+ protected QueueEntry getOldestQueueEntry()
+ {
+ return _entries.next(_entries.getHead());
}
public boolean isDeleted()
@@ -809,35 +814,217 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void moveMessagesToAnotherQueue(long fromMessageId,
- long toMessageId,
+ public void moveMessagesToAnotherQueue(final long fromMessageId,
+ final long toMessageId,
String queueName,
StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ MessageStore store = getVirtualHost().getMessageStore();
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ return (messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && entry.acquire();
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+
+ try
+ {
+ store.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
+ {
+ AMQMessage message = entry.getMessage();
+
+ if(message.isPersistent() && toQueue.isDurable())
+ {
+ store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId());
+ }
+ // dequeue does not decrement the refence count
+ entry.dequeue(storeContext);
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ store.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+ }
+ catch (AMQException e)
+ {
+ try
+ {
+ store.abortTran(storeContext);
+ }
+ catch (AMQException rollbackEx)
+ {
+ _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
+ }
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ for (QueueEntry entry : entries)
+ {
+ toQueue.enqueue(storeContext, entry.getMessage());
+
+ }
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
}
- public void copyMessagesToAnotherQueue(long fromMessageId,
- long toMessageId,
+ public void copyMessagesToAnotherQueue(final long fromMessageId,
+ final long toMessageId,
String queueName,
- StoreContext storeContext)
+ final StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ MessageStore store = getVirtualHost().getMessageStore();
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ if((messageId >= fromMessageId)
+ && (messageId <= toMessageId))
+ {
+ if(!entry.isDeleted())
+ {
+ return entry.getMessage().incrementReference();
+ }
+ }
+
+ return false;
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+ try
+ {
+ store.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
+ {
+ AMQMessage message = entry.getMessage();
+
+ if(message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ {
+ store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId());
+ }
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ store.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+ }
+ catch (AMQException e)
+ {
+ try
+ {
+ store.abortTran(storeContext);
+ }
+ catch (AMQException rollbackEx)
+ {
+ _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
+ }
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ for (QueueEntry entry : entries)
+ {
+ if(entry.getMessage().isReferenced())
+ {
+ toQueue.enqueue(storeContext, entry.getMessage());
+ }
+ }
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
}
public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ try
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
- public void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ final long messageId = node.getMessage().getMessageId();
+
+ if((messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && !node.isDeleted()
+ && node.acquire())
+ {
+ node.discard(storeContext);
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
public void quiesce()
{
@@ -868,10 +1055,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
noDeletes = false;
}
@@ -889,10 +1073,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
count++;
}
@@ -1046,9 +1227,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(node.acquire())
{
final StoreContext reapingStoreContext = new StoreContext();
- node.dequeue(reapingStoreContext);
- node.dispose(reapingStoreContext);
-
+ node.discard(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
@@ -1209,10 +1388,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(!node.isDeleted() && node.expired() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 711497c799..fb70984d99 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -203,12 +203,15 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
_persistent = contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) contentHeaderBody.properties).getDeliveryMode() == 2;
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
+
populateFromMessageMetaData(mmd);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
index 23aaf56876..83e348b9f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
@@ -97,6 +97,10 @@ public class ConnectorConfiguration
defaultValue = "false")
public boolean _multiThreadNIO;
+ @Configured(path = "advanced.useWriteBiasedPool",
+ defaultValue = "true")
+ public boolean useBiasedWrites;
+
public IoAcceptor createAcceptor()
{