diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-12 20:19:50 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-12 20:19:50 +0000 |
commit | f0f34d8914a623dc6fe42038ca443ea560a64a28 (patch) | |
tree | 30896cdbb2b640aff604d326a79bf933e3c63d18 /java/broker/src | |
parent | eb12fe81f29d3d50598eafd01a4eda1fad6275cb (diff) | |
download | qpid-python-f0f34d8914a623dc6fe42038ca443ea560a64a28.tar.gz |
More fixing up of refactoring stuff; getting all maven tests passing and implementing management methods
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655630 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
11 files changed, 269 insertions, 53 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index deb6ac8d94..64694c2686 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -495,9 +495,6 @@ public class AMQChannel // Deliver Message deliveryContext.requeue(unacked); - // Should we allow access To the DM to directy deliver the message? - // As we don't need to check for Consumers or worry about incrementing the message count? - // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index d8a8cfb6d1..41d7f6c067 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -279,6 +279,12 @@ public class Main ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } + + if(connectorConfig.useBiasedWrites) + { + System.setProperty("org.apache.qpid.use_write_biased_pool","true"); + } + int port = connectorConfig.port; String portStr = commandLine.getOptionValue("p"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index cb3aa5259a..db3a05eb52 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -49,6 +49,7 @@ public class TxAck implements TxnOp public void update(long deliveryTag, boolean multiple) { + _unacked.clear(); if (!multiple) { if(_individual == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index e790493a82..1df93dd0d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -278,9 +278,17 @@ public class AMQMessage implements Filterable<AMQException> } /** Threadsafe. Increment the reference count on the message. */ - public void incrementReference() + public boolean incrementReference() { - _referenceCount.incrementAndGet(); + if(_referenceCount.incrementAndGet() <= 1) + { + _referenceCount.decrementAndGet(); + return false; + } + else + { + return true; + } // if (_log.isDebugEnabled()) // { // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); @@ -298,6 +306,7 @@ public class AMQMessage implements Filterable<AMQException> */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { + int count = _referenceCount.decrementAndGet(); // note that the operation of decrementing the reference count and then removing the message does not @@ -306,6 +315,11 @@ public class AMQMessage implements Filterable<AMQException> // not relying on the all the increments having taken place before the delivery manager decrements. if (count == 0) { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _referenceCount.set(Integer.MIN_VALUE/2); + try { // if (_log.isDebugEnabled()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 98583c03d4..780cd49834 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -103,10 +103,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void start(); - void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList); - - - long getMaximumMessageSize(); void setMaximumMessageSize(long value); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index e751212272..431b76754f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -38,7 +38,7 @@ public class AMQQueueFactory throws AMQException { - final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1; + final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1; if(priorities > 1) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 39c28a7355..dd967a7cb1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -34,7 +34,8 @@ public interface QueueEntry extends Comparable<QueueEntry> AVAILABLE, ACQUIRED, EXPIRED, - DEQUEUED + DEQUEUED, + DELETED } public static interface StateChangeListener @@ -62,7 +63,7 @@ public interface QueueEntry extends Comparable<QueueEntry> } - public final class DeletedState extends EntryState + public final class DequeuedState extends EntryState { public State getState() @@ -71,6 +72,16 @@ public interface QueueEntry extends Comparable<QueueEntry> } } + + public final class DeletedState extends EntryState + { + + public State getState() + { + return State.DELETED; + } + } + public final class ExpiredState extends EntryState { @@ -113,6 +124,7 @@ public interface QueueEntry extends Comparable<QueueEntry> final static EntryState AVAILABLE_STATE = new AvailableState(); final static EntryState DELETED_STATE = new DeletedState(); + final static EntryState DEQUEUED_STATE = new DequeuedState(); final static EntryState EXPIRED_STATE = new ExpiredState(); final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); @@ -165,7 +177,7 @@ public interface QueueEntry extends Comparable<QueueEntry> void restoreCredit(); - void discard(StoreContext storeContext) throws AMQException; + void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; boolean isQueueDeleted(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6225501c72..e3338996e1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -252,13 +252,18 @@ public class QueueEntryImpl implements QueueEntry public void dequeue(final StoreContext storeContext) throws FailedDequeueException { + EntryState state = _state; - - getQueue().dequeue(storeContext, this); - if(_stateChangeListeners != null) + if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED); + getQueue().dequeue(storeContext, this); + if(_stateChangeListeners != null) + { + notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + } + } + } private void notifyStateChange(final State oldState, final State newState) @@ -271,8 +276,10 @@ public class QueueEntryImpl implements QueueEntry public void dispose(final StoreContext storeContext) throws MessageCleanupException { - getMessage().decrementReference(storeContext); - delete(); + if(delete()) + { + getMessage().decrementReference(storeContext); + } } public void restoreCredit() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7e2f0fc56a..25116817d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -5,6 +5,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -135,9 +136,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); - - - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -428,6 +426,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(entry.immediateAndNotDelivered()) { dequeue(storeContext, entry); + entry.dispose(storeContext); } else if(!entry.isAcquired()) { @@ -582,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId()); } - entry.delete(); + //entry.dispose(storeContext); } catch (MessageCleanupException e) @@ -685,7 +684,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public long getOldestMessageArrivalTime() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + QueueEntry entry = getOldestQueueEntry(); + return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); + } + + protected QueueEntry getOldestQueueEntry() + { + return _entries.next(_entries.getHead()); } public boolean isDeleted() @@ -809,35 +814,217 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void moveMessagesToAnotherQueue(long fromMessageId, - long toMessageId, + public void moveMessagesToAnotherQueue(final long fromMessageId, + final long toMessageId, String queueName, StoreContext storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + + AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + MessageStore store = getVirtualHost().getMessageStore(); + + + List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + return (messageId >= fromMessageId) + && (messageId <= toMessageId) + && entry.acquire(); + } + + public boolean filterComplete() + { + return false; + } + }); + + + try + { + store.beginTran(storeContext); + + // Move the messages in on the message store. + for (QueueEntry entry : entries) + { + AMQMessage message = entry.getMessage(); + + if(message.isPersistent() && toQueue.isDurable()) + { + store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId()); + } + // dequeue does not decrement the refence count + entry.dequeue(storeContext); + } + + // Commit and flush the move transcations. + try + { + store.commitTran(storeContext); + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + } + catch (AMQException e) + { + try + { + store.abortTran(storeContext); + } + catch (AMQException rollbackEx) + { + _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); + } + throw new RuntimeException(e); + } + + try + { + for (QueueEntry entry : entries) + { + toQueue.enqueue(storeContext, entry.getMessage()); + + } + } + catch (MessageCleanupException e) + { + throw new RuntimeException(e); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + + } - public void copyMessagesToAnotherQueue(long fromMessageId, - long toMessageId, + public void copyMessagesToAnotherQueue(final long fromMessageId, + final long toMessageId, String queueName, - StoreContext storeContext) + final StoreContext storeContext) { - //To change body of implemented methods use File | Settings | File Templates. + AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + MessageStore store = getVirtualHost().getMessageStore(); + + + List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(QueueEntry entry) + { + final long messageId = entry.getMessage().getMessageId(); + if((messageId >= fromMessageId) + && (messageId <= toMessageId)) + { + if(!entry.isDeleted()) + { + return entry.getMessage().incrementReference(); + } + } + + return false; + } + + public boolean filterComplete() + { + return false; + } + }); + + try + { + store.beginTran(storeContext); + + // Move the messages in on the message store. + for (QueueEntry entry : entries) + { + AMQMessage message = entry.getMessage(); + + if(message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + { + store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId()); + } + } + + // Commit and flush the move transcations. + try + { + store.commitTran(storeContext); + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + } + catch (AMQException e) + { + try + { + store.abortTran(storeContext); + } + catch (AMQException rollbackEx) + { + _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); + } + throw new RuntimeException(e); + } + + try + { + for (QueueEntry entry : entries) + { + if(entry.getMessage().isReferenced()) + { + toQueue.enqueue(storeContext, entry.getMessage()); + } + } + } + catch (MessageCleanupException e) + { + throw new RuntimeException(e); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + + } public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) { - //To change body of implemented methods use File | Settings | File Templates. - } + try + { + QueueEntryIterator queueListIterator = _entries.iterator(); - public void enqueueMovedMessages(final StoreContext storeContext, final List<QueueEntry> foundMessagesList) - { - //To change body of implemented methods use File | Settings | File Templates. - } + while(queueListIterator.advance()) + { + QueueEntry node = queueListIterator.getNode(); + + final long messageId = node.getMessage().getMessageId(); + + if((messageId >= fromMessageId) + && (messageId <= toMessageId) + && !node.isDeleted() + && node.acquire()) + { + node.discard(storeContext); + } + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } public void quiesce() { @@ -868,10 +1055,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if(!node.isDeleted() && node.acquire()) { - node.dequeue(storeContext); - - node.dispose(storeContext); - + node.discard(storeContext); noDeletes = false; } @@ -889,10 +1073,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if(!node.isDeleted() && node.acquire()) { - node.dequeue(storeContext); - - node.dispose(storeContext); - + node.discard(storeContext); count++; } @@ -1046,9 +1227,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(node.acquire()) { final StoreContext reapingStoreContext = new StoreContext(); - node.dequeue(reapingStoreContext); - node.dispose(reapingStoreContext); - + node.discard(reapingStoreContext); } } QueueEntry newNode = _entries.next(node); @@ -1209,10 +1388,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(!node.isDeleted() && node.expired() && node.acquire()) { - node.dequeue(storeContext); - - node.dispose(storeContext); - + node.discard(storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 711497c799..fb70984d99 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -203,12 +203,15 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime); - _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + _persistent = contentHeaderBody.properties instanceof BasicContentHeaderProperties && ((BasicContentHeaderProperties) contentHeaderBody.properties).getDeliveryMode() == 2; + _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + + populateFromMessageMetaData(mmd); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index 23aaf56876..83e348b9f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -97,6 +97,10 @@ public class ConnectorConfiguration defaultValue = "false") public boolean _multiThreadNIO; + @Configured(path = "advanced.useWriteBiasedPool", + defaultValue = "true") + public boolean useBiasedWrites; + public IoAcceptor createAcceptor() { |